“源码走读系列”从代码层面分析了storm的具体实现,接下来通过具体的实例来说明storm的使用。因为目前storm已经正式迁移到Apache,文章系列也由twitter storm转为apache storm.
WordCountTopology 使用storm来统计文件中的每个单词的出现次数。
通过该例子来说明tuple发送时的几个要素
source component 发送源destination component 接收者stream 消息通道tuple 消息本身本文涉及到的开发环境搭建可以参考前面的两篇博文。
arch linux简明安装指南在archlinux上搭建storm cluster其实对文件中的单词进行统计是Linux下一个很常见的任务,用awk就可以轻松的解决(如果文件不是太大的话),下面是进行word counting的awk脚本,将其保存为名为wordcount.awk文件。
wordcount.awk
{ for (i = 1; i<=NF; i++) freq[$i]++ } END{ for (word in freq) printf "%s\t%d\n",word,freq[word] }运行该脚本,对文件中的单词进行统计
gawk -f wordcount.awk filename从github上复制内容
git clone https://github.com/nathanmarz/storm-starter.git编译运行
lein deps lein compile java -cp $(lein classpath) WordCountTopologymain函数的主要内容
TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", new RandomSentenceSpout(), 5); builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout"); builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));注意:grouping操作的时候,如果没有显示指定stream id,则使用的是default stream. 如shuffleGrouping("spout")表示从名为spout的component中接收从default stream发送过来的tuple.
在原始版本中,spout不停的向split bolt随机发送句子,Count bolt统计每个单词出现的次数。
那么能不能让Spout在读取完文件之后,通知下游的bolt显示最柊的统计结果呢?
要想达到上述的改进目标,采用如上图所示的结构即可。改变的地方如下,
在Spout中添加一个SUCCESS_STREAM添加只有一个运行实例的statistics bolt当spout读取完文件内容之后,通过SUCCESS_STREAM告诉statistics bolt,文件已经处理完毕,可以打印当前的统计结果添加SUCCESS_STREAM
@Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); declarer.declareStream("SUCCESS_STREAM",new Fields("word")); }使用SUCCESS_STREAM通知下游,文件处理完毕
@Override public void nextTuple() { Utils.sleep(100); String[] sentences = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away", "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" }; if ( count == sentences.length ) { System.out.println(count+" try to emit tuple by success_stream"); _collector.emit("SUCCESS_STREAM",new Values(sentences[0])); count++; }else if ( count < sentences.length ){ _collector.emit(new Values(sentences[count])); count++; } }
将spout的并行数由5改为1
builder.setSpout("spout", new RandomSentenceSpout(), 1);在原有的Topology中添加WordCount2 Bolt
builder.setBolt("count2", new WordCount2(), 1).globalGrouping("count").globalGrouping("spout","SUCCESS_STREAM");WordCount2 Bolt会接收从Count Bolt通过default stream发送的tuple,同时接收Spout通过SUCCESS_STREAM发送的tuple,也就是说wordcount2会接收从两个stream来的数据。
编译修改后的源文件
cd $STROM_STARTER lein compile storm.starter可能会出现以下异常信息,该异常可以忽略。
Exception in thread "main" java.io.FileNotFoundException: Could not locate storm/starter/WordCountTopology__init.class or storm/starter/WordCountTopology.clj on classpath:在local模式下运行修改后的WordCountTopology
java -cp $(lein classpath) storm.starter.WordCountTopology如果一切正常,日志如下所示,线程的名字可能会有所不同。
moon 1 score 1 cow 1 doctor 1 over 1 nature 1 snow 1 four 1 keeps 1 with 1 a 1 white 1 dwarfs 1 at 1 the 4 and 2 i 1 two 1 away 1 seven 2 apple 1 am 1 an 1 jumped 1 day 1 years 1 ago 1可以将WordCountTopology的运行结果和awk脚本的运行结果相比对,结果应该是一致的。
本文通过BasicDRPCTopology的实例来分析DRPCTopology在提交的时候, Topology中究竟含有哪些内容?
DRPC 分布式远程调用(这个说法有意思,远程调用本来就是分布的,何须再加个D, <头文字D>看多了, :)
public static void main(String[] args) throws Exception { LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("exclamation"); builder.addBolt(new ExclaimBolt(), 3); Config conf = new Config(); if (args == null || args.length == 0) { LocalDRPC drpc = new LocalDRPC(); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("drpc-demo", conf, builder.createLocalTopology(drpc)); for (String word : new String[]{ "hello", "goodbye" }) { System.out.println("Result for \"" + word + "\": " + drpc.execute("exclamation", word)); } cluster.shutdown(); drpc.shutdown(); } else { conf.setNumWorkers(3); StormSubmitter.submitTopology(args[0], conf, builder.createRemoteTopology()); } }
问题: 上面的代码中只是添加了一个bolt,并没有设定Spout. 我们知道一个topology中最起码得有一个Spout,那么这里的Spout又隐身于何处呢?
关键的地方就在builder.createLocalTopology, 调用关系如下
LinearDRPCTopologyBuilder::createLocalTopology LinearDRPCTopologyBuilder::createTopology() LinearDRPCTopologyBuilder::createTopology(new DRPCSpout(_function))
原来DRPCTopology中使用的Spout是DRPCSpout.
既然代码已经读到此处,何不再进一步看看createTopology的实现.
简要说明一下该段代码的处理逻辑:
设置DRPCSpout以bolt为入参,创建CoordinatedBolt添加JoinResult Bolt添加ReturnResult Bolt: ReturnResultBolt连接到DRPCServer,并返回结果 private StormTopology createTopology(DRPCSpout spout) { final String SPOUT_ID = "spout"; final String PREPARE_ID = "prepare-request"; TopologyBuilder builder = new TopologyBuilder(); builder.setSpout(SPOUT_ID, spout); builder.setBolt(PREPARE_ID, new PrepareRequest()) .noneGrouping(SPOUT_ID); int i=0; for(; i<_components.size();i++) { Component component = _components.get(i); Map<String, SourceArgs> source = new HashMap<String, SourceArgs>(); if (i==1) { source.put(boltId(i-1), SourceArgs.single()); } else if (i>=2) { source.put(boltId(i-1), SourceArgs.all()); } IdStreamSpec idSpec = null; if(i==_components.size()-1 && component.bolt instanceof FinishedCallback) { idSpec = IdStreamSpec.makeDetectSpec(PREPARE_ID, PrepareRequest.ID_STREAM); } BoltDeclarer declarer = builder.setBolt( boltId(i), new CoordinatedBolt(component.bolt, source, idSpec), component.parallelism); for(Map conf: component.componentConfs) { declarer.addConfigurations(conf); } if(idSpec!=null) { declarer.fieldsGrouping(idSpec.getGlobalStreamId().get_componentId(), PrepareRequest.ID_STREAM, new Fields("request")); } if(i==0 && component.declarations.isEmpty()) { declarer.noneGrouping(PREPARE_ID, PrepareRequest.ARGS_STREAM); } else { String prevId; if(i==0) { prevId = PREPARE_ID; } else { prevId = boltId(i-1); } for(InputDeclaration declaration: component.declarations) { declaration.declare(prevId, declarer); } } if(i>0) { declarer.directGrouping(boltId(i-1), Constants.COORDINATED_STREAM_ID); } } IRichBolt lastBolt = _components.get(_components.size()-1).bolt; OutputFieldsGetter getter = new OutputFieldsGetter(); lastBolt.declareOutputFields(getter); Map<String, StreamInfo> streams = getter.getFieldsDeclaration(); if(streams.size()!=1) { throw new RuntimeException("Must declare exactly one stream from last bolt in LinearDRPCTopology"); } String outputStream = streams.keySet().iterator().next(); List<String> fields = streams.get(outputStream).get_output_fields(); if(fields.size()!=2) { throw new RuntimeException("Output stream of last component in LinearDRPCTopology must contain exactly two fields. The first should be the request id, and the second should be the result."); } builder.setBolt(boltId(i), new JoinResult(PREPARE_ID)) .fieldsGrouping(boltId(i-1), outputStream, new Fields(fields.get(0))) .fieldsGrouping(PREPARE_ID, PrepareRequest.RETURN_STREAM, new Fields("request")); i++; builder.setBolt(boltId(i), new ReturnResults()) .noneGrouping(boltId(i-1)); return builder.createTopology(); }
处理逻辑: 在接收到的每一个单词后面添加'!'.
public static class ExclaimBolt extends BaseBasicBolt { @Override public void execute(Tuple tuple, BasicOutputCollector collector) { String input = tuple.getString(1); collector.emit(new Values(tuple.getValue(0), input + "!")); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("id", "result")); } }介绍TridentTopology的使用,重点分析newDRPCStream和stateQuery的实现机理。
使用TridentTopology进行数据处理的时候,经常会使用State来保存一些状态,这些保存起来的State通过stateQuery来进行查询。问题恰恰在这里产生,即对state进行更新的Stream和尔后进行stateQuery的Stream并非同一个,那么它们之间是如何关联起来的呢。
在TridentTopology中,有一些Processor可能会同处于一个Bolt中,这些Processor形成一个processing chain, 那么Tuple又是如何在这些Processor之间进行传递的呢。
在整个topology中,有两个不同的spout。
此图有好几个问题
PartitionPersistProcessor和StateQueryProcessor同处于一个bolt,该bolt为SubtopologyBoltSubtopologyBolt有来自多个不同Stream的输入,根据不同的Streamid找到对应的InitialReceiverdrpcspout在执行的时候,是一直不停的emit消息到SubtopologyBolt,还是发送完一次消息就停止发送不同的tuple,其sourcestream不一样,根据SourceStream,找到对应的InitialReceiver
Map<String, InitialReceiver> _roots = new HashMap();
进行状态更新的Processor名为PartitionPersistProcessor
记录哪些tuple需要进行状态更新
状态真正更新是发生在finishBatch阶段
persistentAggregate
PartitionPersistProcessor
SubtopologyBolt::execute PartitionPersistProcessor::finishBatch _updater::updateState Snapshottable::update当状态更新的时候,状态查询是否会发生?
进行状态查询的Processor名为StateQueryProcessor
查询的时候,首先调用batchRetrieive来获得最新的状态更新结果,再对每个最新的结果使用_function来进行处理。
调用层次
SubtopologyBolt::finishBatch StateQueryProcessor::finishBatch _function.batchRetrieve_function.execute 将处理过的结果发送给下一跳进行处理
如何决定bolt内部的哪个processor来处理接收到的消息,这个是根据不同的Stream来判断InitialReceiver完成。
当SubtopologyBolt接收到最原始的tuple时,根据streamid找到InitialReceiver后,InitialReceiver在receive函数中作的第一件事情就是根据tuple来创建一个tridenttuple,tridenttuple会被处在同一个SubtopologyBolt中的processor一一处理,处理的结果是保存在tridenttuple和processorcontext中。
ProcessorContext记录两个重要的信息,即当前的batchId和batchState.
public class ProcessorContext { public Object batchId; public Object[] state; public ProcessorContext(Object batchId, Object[] state) { this.batchId = batchId; this.state = state; } }tridentcollector在emit的时候将消息由各个TupleReceiver进行处理。目前仅有BridgeReceiver实现了该接口。
BridgeReceiver负责将消息发送给另外的Bolt进行处理。这里说的“另外的Bolt”是指Vanilla Topology中的Bolt.
相关资源:python入门教程(PDF版)