博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Storm中的Worker
阅读量:6293 次
发布时间:2019-06-22

本文共 5515 字,大约阅读时间需要 18 分钟。

hot3.png

    Storm中worker是实际执行Topology的进程,它由supervisor启动,从zookeeper中获取分配到自身的所有Executor并启动这些Executor来执行。

 

worker中的数据

通过worker-data方法定义了一个包含很多共享数据的映射集合,worker中的很多方法都依赖它;

worker中的计时器

每个计时器都对应着一个java线程,worker中使用计时器进行心跳保持以及获取元数据的更新信息。

worker的心跳

do-heartbeat函数用于产生worker的心跳信息,这些信息被写入本地文件系统中,supervisor会读取这些心跳信息以判断worker的状态,然后决定是否要重启worker.

worker-state方法会创建一个LocalState对象,并调用该对象的put方法将worker的心跳信息存储到本地的文件系统,路径:STORM-LOCAL-DIR/workers/<workerId>/heartbeats,

worker的heartbeat目录下的文件,文件名为当前的时间戳。

Executor的心跳

与worker的心跳不同,它的心跳信息直接发送到zookeeper中保存,主要保存了Executor中Task的运行统计。

 do-executor-heartbeats函数用来发送一次心跳信息

构建Executor的心跳对象,包含如下信息:

storm-id:topologyId

executor-stats:该worker中executor的运行统计,具体为对每一个Task的统计

uptime:worker的启动时间

time-secs:当前时间

调用storm-cluster-state的worker-heartbeat()方法存储心跳信息,在zookeeper中的默认路径为:/storm/workerbeats/<storm-id>/<node-port>

worker使用executor-heartbeat-timer计时器线程来发送Executor的心跳信息,默认为3秒发送一次;

worker中对ZMQ连接的维护

     在进程期间,Storm利用ZMQ来发送和接收信息,并且采用端到端的方法完成消息传输,Worker会根据Topology的定义以及分配到自身的任务情况,计算出自己发出的消息将被哪些Task接收,基于Topology的这一任务分配信息,worker可以熟悉目标Task所在的机器和端口号

    可以看出,Worker通过两种机制来保证连接的可靠性,一是在zookeeper中注册watcher回调通知的方法,这种方式并不一定可靠,例如与zookeeper的连接丢失,则注册的watcher回调方法将失效。二是采用定时器的方法定期执行该函数。

 

从ZooKeeper获取Topology的活跃情况

worker需要获得其执行的Topology的状态,refresh-storm-active函数用于获取topology的状态信息。

mk-halting-timer函数用于调用mk-timer函数来创建一个计时器,该计时器会在遇到错误的时候将错误信息记录到日志中并退出JVM

 

创建Worker

mk-worker函数用于创建Worker进程,主要工作包括启动相应的计时器,创建Worker中对应的Executor,以及启动接收线程来接收消息

过程:

    若为分布式模式,则将打印到控制台的信息打到日志里面;

    若为分布式模式,则将与Worker对应的进程ID放到pids目录下,并创建以进程ID为ID作为文件名的空文件。Supervisor在关闭Worker时会尝试关闭pids目录下面所有与进程ID相对应的进程,Worker创建的子进程也应遵循这样的规则,在Storm中,由于任务会被重新调度,因此正在执行的worker也可能被关闭;

    分别启动Worker以及Executor的心跳计时器线程,这里是预先调用一次,以确保第一次的信条信息可以被快速发送出去,然后启动计时器线程来完成周期性的心跳更新;

创建用于完成ZMQ连接更新的计时器线程;

启动消息的接收线程,receive-thread-shutdown为该线程的关闭函数;

启动消息队列的发送线程;

 

关闭worker

理解Worker关闭函数有利于进一步理解Worker中启动的线程及资源

过程:

1关闭缓存的ZMQ的Socket连接

2关闭消息接收线程

3关闭worker中的所有Executor线程

4关闭ZMQ的上下文,释放已经创建的Socket连接

5关闭消息发送队列和线程

6关闭所有的计时器线程

7关闭资源

8从ZooKeeper中清除该Worker的心跳信息

9断开与ZooKeeper的连接

 

重要的辅助方法介绍

 

   创建worker中的数据结构,启动Worker,关闭Worker的过程中会用到很多辅助方法,这些方法有助于我们更好地理解Worker的工作原理:

worker中的接收函数

Worker中的mk-transfer-local-fn函数用于生产并发送消息到Executor的接收队列,同一个worker内部的Executor之间会通过该函数传递消息。

short-executor-receive-queue-map存储Executor中第一个Task的taskid到该Executor对应的接收队列(Distuptor Queue)的映射关系。

task-getter函数以ZMQ 发来的消息为传入参数,这里的消息为一个含有两个元素的数组,第一个元素为TaskId,task-getter函数的目标是通过消息的taskId获得与其对应的Executor中第一个Task的TaskId,第二个元素为消息的实际内容。

定义函数体,函数的输入为ZMQ收到的一组消息tuple-batch,按照与消息Taskid对应的Executor中第一个Task的TaskId对消息进行分组,其变量grouped对应的键为Executor中第一个Task的Taskid,值为属于该executor的一组消息;

通过executor中第一个task的taskid获得与Executor相对应的接收消息队列q,调用disruptor/publish方法将收到的消息发送至队列q中

fast-group-by方法:

该方法传入一个函数afn和一个列表alist,当前情况下afn为task-getter函数,alist对应于接收的消息。

Worker中的发送函数

 worker中的mk-transfer-fn函数与mk-transfer-local-fn类似,主要有用于Executor的数据发送:这里分两种情况:

1.消息的目标TaskId跟发送TaskId属于同一个Worker,此时不需要跨进程传输消息,因此可以将消息直接发送至接收端Executor的接受队列。

2消息的目标Taskid跟发送TaskId属于不同的worker,此时则将消息发送至worker的发送队列,由worker负责将队列中的消息通过ZMQ发送出去。

Worker消息队列是如何接收消息的呢?Worker中会有一个额外的线程对transfer-queue进行监听,函数mk-transfer-tuples-handler用于创建于Disruptor queue对应的消息处理器。

 

过程:

    drainer列表用于缓存要发送的消息,Disruptor Queue的Onevent回调会调用本函数定义的方法,该方法的最后一个参数标识Queue是否为一个Batch结束,并会在Batch结束之前将消息缓存到drainer列表中。

node+port->socket保存了Worker中与目标node+port相对应的ZMQ Socket连接,node+port代表nimbus的资源分配单位,node则表示一台运行Supervisor的机器,port为该Supervisor上某一个运行Worker的端口号.

task->node+port为从taskId到node+port的映射关系。

endpoint-socket-lock为Worker中定义的ReentrantReadWriteLick类型的锁,Worker中存在一个专门的线程,会对缓存的ZMQ连接进行更新。

 

 

clojure-handler,函数第一个传入参数为一组消息packets,第三个表明Batch是否结束。

将消息packets放入drainer变量中,若batch-end为true,则为了避免跟ZMQ连接更新线程相冲突,这里需要申请读取endpoint-socket-lock锁,然后遍历drainer中缓存的所有消息,根据消息的taskId找到node+port,然后通过从node+port到ZMQSocket的映射关系找到对应Socket连接

调用msg/send函数将消息发送出去

清理drainer缓存

Worker中,使用transfer-tuples,transfer-thread来启动发送监听线程

获取属于Worker的Executor

    read-worker-executors函数用来计算分配到该Worker的Executor,它通过调用Storm-cluster-state的assignment-info函数获得所有Topology的分配信息,然后利用worker的assignemtn-id以及port进行过滤,得到某个worker所属的Executor,这里的assignment-id对应于node,Worker启动后,其执行的Executor集合将不再发生变化,但当任务分配情况发生变化时,Supervisor就会重启worker来处理任务。其中,Nimbus在计算分吴分配时会尽量不改变Worker中已执行的Executor。当前Worker中任何一个Executor处理失败都会导致Worker重启。

 

创建Executor中接收消息队列和查找表

    mk-receive-queue-map函数用于为Worker中的每一个Executor创建接收队列,并将其存入hash表,其中键为ExecutorId,值为Disruptor Queue的对象;

    ExecutorId实际上为含有两个元素的数据,即[startTaskId,endTaskId],表示该Executor执行的任务区间。

worker中映射关系的创建:

用mk-receive-queue-map创建Disruptor Queue

调用executor-id->tasks函数获得Executor中包含的Taskid集合,并创建hash,键为taskId,值为Tasidk所属的Executor的接收队列;

获得Executor中包含的TaskID集合,即为receive-queue-map的键集合;

构建一个新的hash,存储从Executor的看是TaskId到该Executor的接收队列的映射关系;

构建Executor中从TaksId到Executor的起始TaskId的映射关系;

 

对于接收到的一组消息,根据其taskId找到Executor的起始Taskid并根据其进行消息分组,然后根据从起始TaskId到Executor接收队列的哈希表short-executor-receive-queue-map来进行消息的分发。

 

在Worker中,TaskId,Executor以及Executor的收发队列的哈希表的关系如下:

:executor-receive-queue-map:[startTaskId,endTaskId]->Executor接收队列;

:short-executor-receive-queue-map:[startTaskId]->Executor接收队列

:receive-queue-map:[taskId]->Executor接收队列

:task->short-executor:[taskId]->[startTaskId]

 

下载Topology的配置项以及代码

   在执行一个topology时,Supervisor将从Nimbus下载3个文件,他们分别为stormconf.ser,stomrcode.ser,stormjar.jar:

stormconf.ser为Topology配置项的序列化文件,read-supervisor-storm-conf函数用于读取该文件并将其反序列化,

stormcode.ser为Topology的定义文件,可通过read-supervisor-topology函数来读取该文件,并将其反序列化解析

从nimbus上下载这些文件在Supervisor所在机器的路径为:

$StormRoot/stormData/supervisor/stormdist/<stormid>/

stormjar.jar文件包含了用户的资源文件,第三方库等,会解压,并放置于运行目录的resources文件夹下$StormRoot/stormData/supervisor/stormdist/<stormid>/resources

 

worker中的线程及其通信关系:

161623_sRav_1866807.png

over

 

转载于:https://my.oschina.net/iioschina/blog/821559

你可能感兴趣的文章
Phantom.js维护者退出,项目的未来成疑
查看>>
Datical为数据库添加持续交付能力
查看>>
当中台遇上DDD,我们该如何设计微服务?
查看>>
2016年云巴产品更新合集
查看>>
央视在世界杯高清直播中占了C位 它是怎么做到的?
查看>>
新书问答:Company-Wide Agility
查看>>
Oracle将关闭Java.net和Kenai.com社区
查看>>
机器人操作系统来到Windows
查看>>
Propel项目改为基于TensorFlow.js
查看>>
Azure正式对外发布容器服务,支持Swarm和Mesos
查看>>
阿里巴巴收购以色列VR公司,大厂死磕VR为哪般?
查看>>
埃隆·马斯克:比特币拥有着“极为出色”的结构,而纸质货币终将消失
查看>>
如何用度量影响敏捷环境
查看>>
Facebook使用机器学习手段来自动优化其系统性能
查看>>
借助Unity AR Foundation构建跨平台AR应用
查看>>
Kubernetes 落地案例|使用 Kubernetes 重新部署全球最大的教育公司
查看>>
手工测试对比自动化测试
查看>>
vue.js快速入门
查看>>
浅论服务端应用程序开发中的CAP思想(非分布式系统中的CAP理论)
查看>>
socket.io的 Python客户端中文encode问题
查看>>