Storm源码浅析之topology的提交

    xiaoxiao2024-04-17  6

    原文: http://www.blogjava.net/killme2008/archive/2011/11/17/364112.html     作者:dennis (killme2008@gmail.com)     转载请注明出处。     最近一直在读twitter开源的这个分布式流计算框架——storm的源码,还是有必要记录下一些比较有意思的地方。我按照storm的主要概念进行组织,并且只分析我关注的东西,因此称之为浅析。        一、介绍     Storm的开发语言主要是Java和Clojure,其中Java定义骨架,而Clojure编写核心逻辑。源码统计结果:       180  text files.       177  unique files.                                                   7  files ignored. http: // cloc.sourceforge.net v 1.55  T=1.0 s (171.0 files/s, 46869.0 lines/s) ------------------------------------------------------------------------------- Language                     files          blank        comment           code ------------------------------------------------------------------------------- Java                            125             5010             2414            25661 Lisp                             33              732              283             4871 Python                            7              742              433             4675 CSS                               1               12               45             1837 ruby                              2               22                0              104 Bourne Shell                      1                0                0                6 Javascript                        2                1               15                6 ------------------------------------------------------------------------------- SUM:                            171             6519             3190            37160 -------------------------------------------------------------------------------     Java代码25000多行,而Clojure(Lisp)只有4871行,说语言不重要再次证明是扯淡。          二、Topology和Nimbus            Topology是storm的核心理念,将spout和bolt组织成一个topology,运行在storm集群里,完成实时分析和计算的任务。这里我主要想介绍下topology部署到storm集群的大概过程。提交一个topology任务到Storm集群是通过StormSubmitter.submitTopology方法提交: StormSubmitter.submitTopology(name, conf, builder.createTopology());     我们将topology打成jar包后,利用bin/storm这个python脚本,执行如下命令: bin / storm jar xxxx.jar com.taobao.MyTopology args     将jar包提交给storm集群。storm脚本会启动JVM执行Topology的main方法,执行submitTopology的过程。而submitTopology会将jar文件上传到nimbus,上传是通过socket传输。在storm这个python脚本的jar方法里可以看到: def  jar(jarfile, klass,  * args):                                                                                                                                   exec_storm_class(                                                                                                                                                   klass,                                                                                                                                                         jvmtype = " -client " ,                                                                                                                                             extrajars = [jarfile, CONF_DIR, STORM_DIR  +   " /bin " ],                                                                                                             args = args,                                                                                                                                                     prefix ="export STORM_JAR=" + jarfile + ";" )      将jar文件的地址设置为环境变量STORM_JAR,这个环境变量在执行submitTopology的时候用到: // StormSubmitter.java  private   static   void  submitJar(Map conf) {          if (submittedJar == null ) {             LOG.info( " Jar not uploaded to master yet. Submitting jar " );             String localJar  =  System.getenv("STORM_JAR" );             submittedJar  =  submitJar(conf, localJar);         }  else  {             LOG.info( " Jar already uploaded to master. Not submitting jar. " );         }     }     通过环境变量找到jar包的地址,然后上传。利用环境变量传参是个小技巧。     其次,nimbus在接收到jar文件后,存放到数据目录的inbox目录, nimbus数据目录的结构: - nimbus       - inbox           - stormjar - 57f1d694 - 2865 - 4b3b - 8a7c - 99104fc0aea3.jar           - stormjar - 76b4e316 - b430 - 4215 - 9e26 - 4f33ba4ee520.jar       - stormdist          - storm - id             - stormjar.jar             - stormconf.ser             - stormcode.ser      其中inbox用于存放提交的jar文件,每个jar文件都重命名为stormjar加上一个32位的UUID。而stormdist存放的是启动topology后生成的文件,每个topology都分配一个唯一的id,ID的规则是“name-计数-时间戳”。启动后的topology的jar文件名命名为storm.jar ,而它的配置经过java序列化后存放在stormconf.ser文件,而stormcode.ser是将topology本身序列化后存放的文件。 这些文件在部署的时候,supervisor会从这个目录下载这些文件,然后在supervisor本地执行这些代码。     进入重点,topology任务的分配过程(zookeeper路径说明忽略root): 1.在zookeeper上创建/taskheartbeats/{storm id} 路径,用于任务的心跳检测。storm对zookeeper的一个重要应用就是利用zk的临时节点做存活检测。task将定时刷新节点的时间戳,然后nimbus会检测这个时间戳是否超过timeout设置。 2.从topology中获取bolts,spouts设置的并行数目以及全局配置的最大并行数,然后产生task id列表,如[1 2 3 4] 3.在zookeeper上创建/tasks/{strom id}/{task id}路径,并存储task信息 4.开始分配任务(内部称为assignment), 具体步骤:  (1)从zk上获得已有的assignment(新的toplogy当然没有了)  (2)查找所有可用的slot,所谓slot就是可用的worker,在所有supervisor上配置的多个worker的端口。  (3)将任务均匀地分配给可用的worker,这里有两种情况:  (a)task数目比worker多,例如task是[1 2 3 4],可用的slot只有[host1:port1 host2:port1],那么最终是这样分配 { 1 : [host1:port1]  2  : [host2:port1]           3  : [host1:port1]  4  : [host2:port1]} ,可以看到任务平均地分配在两个worker上。 (b)如果task数目比worker少,例如task是[1 2],而worker有[host1:port1 host1:port2 host2:port1 host2:port2],那么首先会将woker排序, 将不同host间隔排列,保证task不会全部分配到同一个worker上,也就是将worker排列成 [host1:port1 host2:port1 host1:port2 host2:port2] ,然后分配任务为 { 1 : host1:port1 ,  2  : host2:port2}

    (4)记录启动时间 (5)判断现有的assignment是否跟重新分配的assignment相同,如果相同,不需要变更,否则更新assignment到zookeeper的/assignments/{storm id}上。 5.启动topology,所谓启动,只是将zookeeper上/storms/{storm id}对应的数据里的active设置为true。 6.nimbus会检查task的心跳,如果发现task心跳超过超时时间,那么会重新跳到第4步做re-assignment。

    文章转自庄周梦蝶  ,原文发布时间2011-12-01

    相关资源:敏捷开发V1.0.pptx
    最新回复(0)