博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
流量计算-Jstorm提交Topology过程(下一个)
阅读量:5364 次
发布时间:2019-06-15

本文共 4697 字,大约阅读时间需要 15 分钟。

马上部分,

5、上篇任务已经ServiceHandler.submitTopologyWithOpts()方法。在该方法中,会实例化一个TopologyAssignEvent,相当于创建了一个topology级别的作业,然后将其保存到TopologyAssign的任务队列中。详细代码例如以下:

TopologyAssignEvent assignEvent = new TopologyAssignEvent();			assignEvent.setTopologyId(topologyId);			assignEvent.setScratch(false);			assignEvent.setTopologyName(topologyname);			assignEvent.setOldStatus(Thrift					.topologyInitialStatusToStormStatus(options							.get_initial_status()));			TopologyAssign.push(assignEvent);
6、TopologyAssign是Jstorm一个任务分配器。它会依据配置和Topology中spout和bolt的关系来进行Task的创建和分配,可是详细任务的创建和非配并发其自身完毕的,二是调用Jstorm自身的调度器完毕的。当然Jstorm同意用户依据自己业务需求定制调度器,关于Jstorm的调度器分析会本人专门写一篇文章,此处暂不做不论什么说明。回到TopologyAssign。该类是一个实现了Runnable接口的后台线程。随着Nimbus启动,主要完毕topology作业分配、备份和作业均衡的作用。当天还是通过Jstorm的调度器来完毕的。其run方法会採用堵塞的方式获取自身作业队列中的作业,然后进行作业分配,其作业分配核心业务例如以下

public Assignment mkAssignment(TopologyAssignEvent event) throws Exception {		String topologyId = event.getTopologyId();		TopologyAssignContext context = prepareTopologyAssign(event);		//ResourceWorkerSlot是worker的抽象。封装了worker和其task		Set
assignments = null; IToplogyScheduler scheduler = schedulers.get(DEFAULT_SCHEDULER_NAME); //通过Jstorm的调度来计算任务的分配 assignments = scheduler.assignTasks(context); Assignment assignment = null; Map
nodeHost = getTopologyNodeHost( context.getCluster(), context.getOldAssignment(), assignments); Map
startTimes = getTaskStartTimes(context, nimbusData, topologyId, context.getOldAssignment(), assignments); //获取提交到集群的jar包地址,Worker运行任务时须要下载代码 String codeDir = StormConfig.masterStormdistRoot(nimbusData.getConf(), topologyId); assignment = new Assignment(codeDir, assignments, nodeHost, startTimes); StormClusterState stormClusterState = nimbusData.getStormClusterState(); //将分配好的任务上传到ZK,通知supervisor stormClusterState.set_assignment(topologyId, assignment); //更新Task的開始时间 NimbusUtils.updateTaskHbStartTime(nimbusData, assignment, topologyId); // 更新元信息到ZK if (context.getAssignType() == TopologyAssignContext.ASSIGN_TYPE_REBALANCE || context.getAssignType() == TopologyAssignContext.ASSIGN_TYPE_MONITOR) NimbusUtils.updateMetricsInfo(nimbusData, topologyId, assignment); else metricsMonitor(event); return assignment; }
7、Nimbus已经将任务分配好了。而且创建到ZK上,此时就须要supervisor认领自己的任务了,supervisor获取任务的详细逻辑封装在SyncSupervisorEvent,其也是一个后台线程,会不停获取ZK上(JSTORM_ROOT/assignments下)的所有任务,然后把自己的任务保存到本地磁盘上。再通过NimbusClient把topology的代码保存到本地,然后启动worker启动线程来运行任务,详细业务逻辑代码例如以下

public void run() {			RunnableCallback syncCallback = new EventManagerZkPusher(this,					syncSupEventManager);			/**			 *首次启动时主动获取ZK上JSTORM_ROOT/assignments的所有任务,兴许通过ZK的watch以一种回调的方式获取任务,			 */			Map
assignments = Cluster.get_all_assignment( stormClusterState, syncCallback); /** *获取本地已经下载的topology */ List
downloadedTopologyIds = StormConfig .get_supervisor_toplogy_list(conf); /** * 在所有作业中。获取自身的作业 */ Map
localAssignment = getLocalAssign( stormClusterState, supervisorId, assignments); /** * 将作业保存到本地磁盘 */ localState.put(Common.LS_LOCAL_ASSIGNMENTS, localAssignment); // 获取topology的代码下载地址 Map
topologyCodes = getTopologyCodeLocations( assignments, supervisorId); //通过NimbusClient将代码下载到本地 downloadTopology(topologyCodes, downloadedTopologyIds); /** * 删除没用的topology */ removeUselessTopology(topologyCodes, downloadedTopologyIds); /** * 将syncProcesses加到运行队列。syncProcesses复杂启动新的worker来运行任务 */ processEventManager.add(syncProcesses); }
8、SyncSupervisorEvent将自己的作业选出来,并保存到本地之后,再由SyncProcessEvent来启动worker运行详细的作业。SyncProcessEvent主要干两件事。启动新的worker。杀死没用的worker。此处要涉及启动新的Worker,详细业务逻辑例如以下

private void startNewWorkers(Set
keepPorts, Map
localAssignments) throws Exception { /** * 获取本次新分配的作业 */ Map
newWorkers = JStormUtils .select_keys_pred(keepPorts, localAssignments); /** * 给每一个新作业生成一个ID */ Map
newWorkerIds = new HashMap
(); for (Entry
entry : newWorkers.entrySet()) { Integer port = entry.getKey(); LocalAssignment assignment = entry.getValue(); String workerId = UUID.randomUUID().toString(); newWorkerIds.put(port, workerId); //保存每一个Worker的ID到本地 StormConfig.worker_pids_root(conf, workerId); //启动新的JVM运行作业 launchWorker(conf, sharedContext, assignment.getTopologyId(), supervisorId, port, workerId, assignment); }
以上就是Jstorm提交一个topology的过程。这两篇文章仅仅是给出了一条主线。具体的代码逻辑并未具体给出,兴许会不断完好,同一时候关于Jstrom的调度器兴许也会给详细分析

版权声明:本文博客原创文章。博客,未经同意,不得转载。

转载于:https://www.cnblogs.com/blfshiye/p/4623034.html

你可能感兴趣的文章
05BeautifulSoup遍历文档书及搜索文档树
查看>>
e3mall商城的归纳总结1之项目的架构
查看>>
CF798
查看>>
Codeforces 294C 组合数学
查看>>
网页上播放mp3或flash
查看>>
MapReduce教程(一)基于MapReduce框架开发<转>
查看>>
洛谷P1829 [国家集训队]Crash的数字表格 / JZPTAB(莫比乌斯反演)
查看>>
关于字符串为空的判断条件
查看>>
使用sysprep.exe制作GHO或WIM镜像
查看>>
netstat命令的使用详解
查看>>
[LeetCode#72]Edit Distance
查看>>
.NET备份博客园随笔分类文章
查看>>
Ubuntu窗口大小调节方法
查看>>
English Learning -->英语词汇记忆10大规则<思维导图>
查看>>
HDOJ(HDU) 2148 Score(比较、)
查看>>
大不了高三艹个FZU
查看>>
S2_SQL_第一章
查看>>
基础计算几何
查看>>
nodeJS socket-io服务搭建 /socket.io/socket.io.js 404的问题
查看>>
tomcat和Jetty的比較
查看>>