先定义两个接口和一个类 TaskMessage类本身比较好理解, 抽象storm的message格式 对于IContext, 注释也说了, 定义messaging plugin, 通过什么渠道去发送message, storm这里设计成可替换的 默认定义storm实现了local和ZMQ两种plugin, 当然你可以实现更多的 local应该是用于local mode, 而ZMQ用于distributed mode
IContext接口主要是用于创建IConnection, 体现对socket的管理, 分别通过bind和connect定义服务器端和客户端的connection IConnection接口主要用于定义, 真正收发message的逻辑
最终通过TransportFactory, 根据Config.STORM_MESSAGING_TRANSPORT的配置, 利用Java的reflection动态的创建不同类型的context
TaskMessage如其名, 包含task和message字段, 以说明发送给哪个task的message 并且定义了序列化和反序列化的函数
public class TaskMessage { private int _task; private byte[] _message; public TaskMessage(int task, byte[] message) { _task = task; _message = message; } public int task() { return _task; } public byte[] message() { return _message; } public ByteBuffer serialize() { ByteBuffer bb = ByteBuffer.allocate(_message.length+2); bb.putShort((short)_task); bb.put(_message); return bb; } public void deserialize(ByteBuffer packet) { if (packet==null) return; _task = packet.getShort(); _message = new byte[packet.limit()-2]; packet.get(_message); } }
可以详细看看local和ZMQ的plugin的实现
在local模式下使用的message plugin 实现比较简单, 所有都基于queues-map来实现, 这里的queue直接使用LinkedBlockingQueue, 因为local用于测试, 不用考虑高效性 所有的接收队列或发送队列都通过add-queue!加到queues-map里面(stormid+port作为key) 那么所有的recv和send, 都是基于queue的操作
(defn add-queue! [queues-map lock storm-id port] (let [id (str storm-id "-" port)] (locking lock (when-not (contains? @queues-map id) (swap! queues-map assoc id (LinkedBlockingQueue.)))) (@queues-map id))) (deftype LocalConnection [storm-id port queues-map lock queue] IConnection (^TaskMessage recv [this ^int flags] (when-not queue (throw (IllegalArgumentException. "Cannot receive on this socket"))) (if (= flags 1) (.poll queue) (.take queue))) (^void send [this ^int taskId ^bytes payload] (let [send-queue (add-queue! queues-map lock storm-id port)] (.put send-queue (TaskMessage. taskId payload)) )) (^void close [this] )) (deftype LocalContext [^{:unsynchronized-mutable true} queues-map ^{:unsynchronized-mutable true} lock] IContext (^void prepare [this ^Map storm-conf] (set! queues-map (atom {})) (set! lock (Object.))) (^IConnection bind [this ^String storm-id ^int port] (LocalConnection. storm-id port queues-map lock (add-queue! queues-map lock storm-id port))) (^IConnection connect [this ^String storm-id ^String host ^int port] (LocalConnection. storm-id port queues-map lock nil)) (^void term [this] ))这里使用Deftype, 而不是Defrecord, 即connection和context本身不需要对字典的支持 并且在IContext的实现中, 使用到了可变field, 据说是比较难用对的高级特性 我个人的理解, 是因为deftype和defrecord一样, 没有闭包的效果, 而只有field(对象成员)可以随时被接口函数访问, 所以有些场景下需要field的mutable, 比如这里的queues-map 之前类似的场景都是用reify实现的, 这里给出用deftype实现的版本
号称最快的消息队列, 接近socket API 的性能, 参考http://www.cnblogs.com/yjf512/archive/2012/03/03/2378024.html 在distributed mode时, storm使用ZMQ作为进程间和instrance间通信
(deftype ZMQConnection [socket] IConnection (^TaskMessage recv [this ^int flags] (require 'backtype.storm.messaging.zmq) (if-let [packet (mq/recv socket flags)] (parse-packet packet))) (^void send [this ^int taskId ^bytes payload] (require 'backtype.storm.messaging.zmq) (mq/send socket (mk-packet taskId payload) ZMQ/NOBLOCK)) ;; TODO: how to do backpressure if doing noblock?... need to only unblock if the target disappears (^void close [this] (.close socket))) (deftype ZMQContext [^{:unsynchronized-mutable true} context ^{:unsynchronized-mutable true} linger-ms ^{:unsynchronized-mutable true} hwm ^{:unsynchronized-mutable true} local?] IContext (^void prepare [this ^Map storm-conf] (let [num-threads (storm-conf ZMQ-THREADS)] (set! context (mq/context num-threads)) (set! linger-ms (storm-conf ZMQ-LINGER-MILLIS)) (set! hwm (storm-conf ZMQ-HWM)) (set! local? (= (storm-conf STORM-CLUSTER-MODE) "local")))) (^IConnection bind [this ^String storm-id ^int port] (require 'backtype.storm.messaging.zmq) (-> context (mq/socket mq/pull) (mq/set-hwm hwm) (mq/bind (get-bind-zmq-url local? port)) mk-connection )) (^IConnection connect [this ^String storm-id ^String host ^int port] (require 'backtype.storm.messaging.zmq) (-> context (mq/socket mq/push) (mq/set-hwm hwm) (mq/set-linger linger-ms) (mq/connect (get-connect-zmq-url local? host port)) mk-connection)) (^void term [this] (.term context)) ZMQContextQuery (zmq-context [this] context)) 本文章摘自博客园,原文发布日期:2013-07-16 相关资源:敏捷开发V1.0.pptx