Storm中worker是实际执行Topology的进程,它由supervisor启动,从zookeeper中获取分配到自身的所有Executor并启动这些Executor来执行。
worker中的数据
通过worker-data方法定义了一个包含很多共享数据的映射集合,worker中的很多方法都依赖它;
worker中的计时器
每个计时器都对应着一个java线程,worker中使用计时器进行心跳保持以及获取元数据的更新信息。
worker的心跳
do-heartbeat函数用于产生worker的心跳信息,这些信息被写入本地文件系统中,supervisor会读取这些心跳信息以判断worker的状态,然后决定是否要重启worker.
worker-state方法会创建一个LocalState对象,并调用该对象的put方法将worker的心跳信息存储到本地的文件系统,路径:STORM-LOCAL-DIR/workers/<workerId>/heartbeats,
worker的heartbeat目录下的文件,文件名为当前的时间戳。
Executor的心跳
与worker的心跳不同,它的心跳信息直接发送到zookeeper中保存,主要保存了Executor中Task的运行统计。
do-executor-heartbeats函数用来发送一次心跳信息
构建Executor的心跳对象,包含如下信息:
storm-id:topologyId
executor-stats:该worker中executor的运行统计,具体为对每一个Task的统计
uptime:worker的启动时间
time-secs:当前时间
调用storm-cluster-state的worker-heartbeat()方法存储心跳信息,在zookeeper中的默认路径为:/storm/workerbeats/<storm-id>/<node-port>
worker使用executor-heartbeat-timer计时器线程来发送Executor的心跳信息,默认为3秒发送一次;
worker中对ZMQ连接的维护
在进程期间,Storm利用ZMQ来发送和接收信息,并且采用端到端的方法完成消息传输,Worker会根据Topology的定义以及分配到自身的任务情况,计算出自己发出的消息将被哪些Task接收,基于Topology的这一任务分配信息,worker可以熟悉目标Task所在的机器和端口号
可以看出,Worker通过两种机制来保证连接的可靠性,一是在zookeeper中注册watcher回调通知的方法,这种方式并不一定可靠,例如与zookeeper的连接丢失,则注册的watcher回调方法将失效。二是采用定时器的方法定期执行该函数。
从ZooKeeper获取Topology的活跃情况
worker需要获得其执行的Topology的状态,refresh-storm-active函数用于获取topology的状态信息。
mk-halting-timer函数用于调用mk-timer函数来创建一个计时器,该计时器会在遇到错误的时候将错误信息记录到日志中并退出JVM
创建Worker
mk-worker函数用于创建Worker进程,主要工作包括启动相应的计时器,创建Worker中对应的Executor,以及启动接收线程来接收消息
过程:
若为分布式模式,则将打印到控制台的信息打到日志里面;
若为分布式模式,则将与Worker对应的进程ID放到pids目录下,并创建以进程ID为ID作为文件名的空文件。Supervisor在关闭Worker时会尝试关闭pids目录下面所有与进程ID相对应的进程,Worker创建的子进程也应遵循这样的规则,在Storm中,由于任务会被重新调度,因此正在执行的worker也可能被关闭;
分别启动Worker以及Executor的心跳计时器线程,这里是预先调用一次,以确保第一次的信条信息可以被快速发送出去,然后启动计时器线程来完成周期性的心跳更新;
创建用于完成ZMQ连接更新的计时器线程;
启动消息的接收线程,receive-thread-shutdown为该线程的关闭函数;
启动消息队列的发送线程;
关闭worker
理解Worker关闭函数有利于进一步理解Worker中启动的线程及资源
过程:
1关闭缓存的ZMQ的Socket连接
2关闭消息接收线程
3关闭worker中的所有Executor线程
4关闭ZMQ的上下文,释放已经创建的Socket连接
5关闭消息发送队列和线程
6关闭所有的计时器线程
7关闭资源
8从ZooKeeper中清除该Worker的心跳信息
9断开与ZooKeeper的连接
重要的辅助方法介绍
创建worker中的数据结构,启动Worker,关闭Worker的过程中会用到很多辅助方法,这些方法有助于我们更好地理解Worker的工作原理:
worker中的接收函数
Worker中的mk-transfer-local-fn函数用于生产并发送消息到Executor的接收队列,同一个worker内部的Executor之间会通过该函数传递消息。
short-executor-receive-queue-map存储Executor中第一个Task的taskid到该Executor对应的接收队列(Distuptor Queue)的映射关系。
task-getter函数以ZMQ 发来的消息为传入参数,这里的消息为一个含有两个元素的数组,第一个元素为TaskId,task-getter函数的目标是通过消息的taskId获得与其对应的Executor中第一个Task的TaskId,第二个元素为消息的实际内容。
定义函数体,函数的输入为ZMQ收到的一组消息tuple-batch,按照与消息Taskid对应的Executor中第一个Task的TaskId对消息进行分组,其变量grouped对应的键为Executor中第一个Task的Taskid,值为属于该executor的一组消息;
通过executor中第一个task的taskid获得与Executor相对应的接收消息队列q,调用disruptor/publish方法将收到的消息发送至队列q中
fast-group-by方法:
该方法传入一个函数afn和一个列表alist,当前情况下afn为task-getter函数,alist对应于接收的消息。
Worker中的发送函数
worker中的mk-transfer-fn函数与mk-transfer-local-fn类似,主要有用于Executor的数据发送:这里分两种情况:
1.消息的目标TaskId跟发送TaskId属于同一个Worker,此时不需要跨进程传输消息,因此可以将消息直接发送至接收端Executor的接受队列。
2消息的目标Taskid跟发送TaskId属于不同的worker,此时则将消息发送至worker的发送队列,由worker负责将队列中的消息通过ZMQ发送出去。
Worker消息队列是如何接收消息的呢?Worker中会有一个额外的线程对transfer-queue进行监听,函数mk-transfer-tuples-handler用于创建于Disruptor queue对应的消息处理器。
过程:
drainer列表用于缓存要发送的消息,Disruptor Queue的Onevent回调会调用本函数定义的方法,该方法的最后一个参数标识Queue是否为一个Batch结束,并会在Batch结束之前将消息缓存到drainer列表中。
node+port->socket保存了Worker中与目标node+port相对应的ZMQ Socket连接,node+port代表nimbus的资源分配单位,node则表示一台运行Supervisor的机器,port为该Supervisor上某一个运行Worker的端口号.
task->node+port为从taskId到node+port的映射关系。
endpoint-socket-lock为Worker中定义的ReentrantReadWriteLick类型的锁,Worker中存在一个专门的线程,会对缓存的ZMQ连接进行更新。
clojure-handler,函数第一个传入参数为一组消息packets,第三个表明Batch是否结束。
将消息packets放入drainer变量中,若batch-end为true,则为了避免跟ZMQ连接更新线程相冲突,这里需要申请读取endpoint-socket-lock锁,然后遍历drainer中缓存的所有消息,根据消息的taskId找到node+port,然后通过从node+port到ZMQSocket的映射关系找到对应Socket连接
调用msg/send函数将消息发送出去
清理drainer缓存
Worker中,使用transfer-tuples,transfer-thread来启动发送监听线程
获取属于Worker的Executor
read-worker-executors函数用来计算分配到该Worker的Executor,它通过调用Storm-cluster-state的assignment-info函数获得所有Topology的分配信息,然后利用worker的assignemtn-id以及port进行过滤,得到某个worker所属的Executor,这里的assignment-id对应于node,Worker启动后,其执行的Executor集合将不再发生变化,但当任务分配情况发生变化时,Supervisor就会重启worker来处理任务。其中,Nimbus在计算分吴分配时会尽量不改变Worker中已执行的Executor。当前Worker中任何一个Executor处理失败都会导致Worker重启。
创建Executor中接收消息队列和查找表
mk-receive-queue-map函数用于为Worker中的每一个Executor创建接收队列,并将其存入hash表,其中键为ExecutorId,值为Disruptor Queue的对象;
ExecutorId实际上为含有两个元素的数据,即[startTaskId,endTaskId],表示该Executor执行的任务区间。
worker中映射关系的创建:
用mk-receive-queue-map创建Disruptor Queue
调用executor-id->tasks函数获得Executor中包含的Taskid集合,并创建hash,键为taskId,值为Tasidk所属的Executor的接收队列;
获得Executor中包含的TaskID集合,即为receive-queue-map的键集合;
构建一个新的hash,存储从Executor的看是TaskId到该Executor的接收队列的映射关系;
构建Executor中从TaksId到Executor的起始TaskId的映射关系;
对于接收到的一组消息,根据其taskId找到Executor的起始Taskid并根据其进行消息分组,然后根据从起始TaskId到Executor接收队列的哈希表short-executor-receive-queue-map来进行消息的分发。
在Worker中,TaskId,Executor以及Executor的收发队列的哈希表的关系如下:
:executor-receive-queue-map:[startTaskId,endTaskId]->Executor接收队列;
:short-executor-receive-queue-map:[startTaskId]->Executor接收队列
:receive-queue-map:[taskId]->Executor接收队列
:task->short-executor:[taskId]->[startTaskId]
下载Topology的配置项以及代码
在执行一个topology时,Supervisor将从Nimbus下载3个文件,他们分别为stormconf.ser,stomrcode.ser,stormjar.jar:
stormconf.ser为Topology配置项的序列化文件,read-supervisor-storm-conf函数用于读取该文件并将其反序列化,
stormcode.ser为Topology的定义文件,可通过read-supervisor-topology函数来读取该文件,并将其反序列化解析
从nimbus上下载这些文件在Supervisor所在机器的路径为:
$StormRoot/stormData/supervisor/stormdist/<stormid>/
stormjar.jar文件包含了用户的资源文件,第三方库等,会解压,并放置于运行目录的resources文件夹下$StormRoot/stormData/supervisor/stormdist/<stormid>/resources
worker中的线程及其通信关系:
over