Storm-源码分析-Topology Submit-Supervisor

    xiaoxiao2024-05-08  7

    mk-supervisor

    (defserverfn mk-supervisor [conf shared-context ^ISupervisor isupervisor] (log-message "Starting Supervisor with conf " conf) (.prepare isupervisor conf (supervisor-isupervisor-dir conf)) ;;初始化supervisor-id,并存在localstate中(参考ISupervisor的实现) (FileUtils/cleanDirectory (File. (supervisor-tmp-dir conf))) ;;清空本机的supervisor目录 (let [supervisor (supervisor-data conf shared-context isupervisor);;创建两个event-manager,用于在后台执行function [event-manager processes-event-manager :as managers] [(event/event-manager false) (event/event-manager false)] sync-processes (partial sync-processes supervisor) ;;partial sync-process ;;mk-synchronize-supervisor, mk-supervisor的主要工作,参考下面 synchronize-supervisor (mk-synchronize-supervisor supervisor sync-processes event-manager processes-event-manager) ;;定义生成supervisor hb的funciton heartbeat-fn (fn [] (.supervisor-heartbeat! (:storm-cluster-state supervisor) (:supervisor-id supervisor) (SupervisorInfo. (current-time-secs) (:my-hostname supervisor) (:assignment-id supervisor) (keys @(:curr-assignment supervisor)) ;; used ports (.getMetadata isupervisor) (conf SUPERVISOR-SCHEDULER-META) ((:uptime supervisor)))))] ;;先调用heartbeat-fn发送一次supervisor的hb ;;接着使用schedule-recurring去定期调用heartbeat-fn更新hb (heartbeat-fn) ;; should synchronize supervisor so it doesn't launch anything after being down (optimization) (schedule-recurring (:timer supervisor) 0 (conf SUPERVISOR-HEARTBEAT-FREQUENCY-SECS) heartbeat-fn))  

    mk-synchronize-supervisor

    supervisor很简单, 主要管两件事,  当assignment发生变化时, 从nimbus同步topology的代码到本地  当assignment发生变化时, check workers状态, 保证被分配的work的状态都是valid

    两个需求,  1. 当assignment发生变化时触发      怎样通过zookeeper的watcher实现这个反复触发机制, 参考Storm-源码分析- Storm中Zookeeper的使用

    2. 因为比较耗时, 后台执行      创建两个event-manager, 分别用于后台执行mk-synchronize-supervisor和sync-processes

    mk-synchronize-supervisor, 比较特别的是内部用了一个有名字的匿名函数this来封装这个函数体  刚开始看到非常诧异, 其实目的是为了可以在sync-callback中将这个函数add到event-manager里面去  即每次被调用, 都需要再一次把sync-callback注册到zk, 以保证下次可以被继续触发

     

    (defn mk-synchronize-supervisor [supervisor sync-processes event-manager processes-event-manager] (fn this [] (let [conf (:conf supervisor) storm-cluster-state (:storm-cluster-state supervisor) ^ISupervisor isupervisor (:isupervisor supervisor) ^LocalState local-state (:local-state supervisor) ;;本地缓存数据库 sync-callback (fn [& ignored] (.add event-manager this)) ;;生成callback函数(后台执行mk-synchronize-supervisor) assignments-snapshot (assignments-snapshot storm-cluster-state sync-callback) ;;读取assignments,并注册callback,在zk->assignment发生变化时被触发 storm-code-map (read-storm-code-locations assignments-snapshot) ;;从哪儿下载topology code downloaded-storm-ids (set (read-downloaded-storm-ids conf)) ;;已经下载了哪些topology all-assignment (read-assignments ;;supervisor的port上被分配了哪些executors assignments-snapshot (:assignment-id supervisor)) ;;supervisor-id new-assignment (->> all-assignment ;;new=all,因为confirmAssigned没有具体实现,always返回true (filter-key #(.confirmAssigned isupervisor %))) assigned-storm-ids (assigned-storm-ids-from-port-assignments new-assignment) ;;supervisor上被分配的topology id集合 existing-assignment (.get local-state LS-LOCAL-ASSIGNMENTS)] ;;从local-state数据库里面读出当前保存的local assignments ;;下载新分配的topology代码 (doseq [[storm-id master-code-dir] storm-code-map] (when (and (not (downloaded-storm-ids storm-id)) (assigned-storm-ids storm-id)) (download-storm-code conf storm-id master-code-dir))) (.put local-state ;;把new-assignment存到local-state数据库中 LS-LOCAL-ASSIGNMENTS new-assignment) (reset! (:curr-assignment supervisor) new-assignment) ;;把new-assignment cache到supervisor对象中 ;;删除无用的topology code ;;remove any downloaded code that's no longer assigned or active (doseq [storm-id downloaded-storm-ids] (when-not (assigned-storm-ids storm-id) (log-message "Removing code for storm id " storm-id) (rmr (supervisor-stormdist-root conf storm-id)) )) ;;后台执行sync-processes (.add processes-event-manager sync-processes) )))

    sync-processes

    sync-processes用于管理workers, 比如处理不正常的worker或dead worker, 并创建新的workers  首先从本地读出workers的hb, 来判断work状况, shutdown所有状态非valid的workers  并为被assignment, 而worker状态非valid的slot, 创建新的worker

    (defn sync-processes [supervisor] (let [conf (:conf supervisor) ^LocalState local-state (:local-state supervisor) assigned-executors (defaulted (.get local-state LS-LOCAL-ASSIGNMENTS) {}) now (current-time-secs) allocated (read-allocated-workers supervisor assigned-executors now) ;;1.读取当前worker的状况 keepers (filter-val ;;找出状态为valid的worker (fn [[state _]] (= state :valid)) allocated) keep-ports (set (for [[id [_ hb]] keepers] (:port hb))) ;;keepers的ports集合 ;;select-keys-pred(pred map), 对map中的key使用pred进行过滤 ;;找出assigned-executors中executor的port, 哪些不属于keep-ports, ;;即找出新被assign的workers或那些虽被assign但状态不是valid的workers(dead或没有start) ;;这些executors需要从新分配到新的worker上去 reassign-executors (select-keys-pred (complement keep-ports) assigned-executors) new-worker-ids (into {} (for [port (keys reassign-executors)] ;;为reassign-executors的port产生新的worker-id [port (uuid)])) ] ;; 1. to kill are those in allocated that are dead or disallowed ;; 2. kill the ones that should be dead ;; - read pids, kill -9 and individually remove file ;; - rmr heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and log) ;; 3. of the rest, figure out what assignments aren't yet satisfied ;; 4. generate new worker ids, write new "approved workers" to LS ;; 5. create local dir for worker id ;; 5. launch new workers (give worker-id, port, and supervisor-id) ;; 6. wait for workers launch (doseq [[id [state heartbeat]] allocated] (when (not= :valid state) ;;shutdown所有状态不是valid的worker (shutdown-worker supervisor id))) (doseq [id (vals new-worker-ids)] (local-mkdirs (worker-pids-root conf id))) ;;为新的worker创建目录, 并加到local-state的LS-APPROVED-WORKERS中 (.put local-state LS-APPROVED-WORKERS ;;更新的approved worker, 状态为valid的 + new workers (merge (select-keys (.get local-state LS-APPROVED-WORKERS) ;;现有approved worker中状态为valid (keys keepers)) (zipmap (vals new-worker-ids) (keys new-worker-ids)) ;;new workers )) (wait-for-workers-launch ;;2.wait-for-workers-launch conf (dofor [[port assignment] reassign-executors] (let [id (new-worker-ids port)] (launch-worker supervisor (:storm-id assignment) port id) id))) ))

    1. read-allocated-workers

    (defn read-allocated-workers "Returns map from worker id to worker heartbeat. if the heartbeat is nil, then the worker is dead (timed out or never wrote heartbeat)" [supervisor assigned-executors now] (let [conf (:conf supervisor) ^LocalState local-state (:local-state supervisor);从local-state中读出每个worker的hb, 当然每个worker进程会不断的更新本地hb id->heartbeat (read-worker-heartbeats conf) approved-ids (set (keys (.get local-state LS-APPROVED-WORKERS)))] ;;从local-state读出approved的worker (into {} (dofor [[id hb] id->heartbeat] ;;根据hb来判断worker的当前状态 (let [state (cond (or (not (contains? approved-ids id)) (not (matches-an-assignment? hb assigned-executors))) :disallowed ;;不被允许 (not hb) :not-started ;;无hb,没有start (> (- now (:time-secs hb)) (conf SUPERVISOR-WORKER-TIMEOUT-SECS)) :timed-out ;;超时,dead true :valid)] (log-debug "Worker " id " is " state ": " (pr-str hb) " at supervisor time-secs " now) [id [state hb]] ;;返回每个worker的当前state和hb )) )))

     

    2. wait-for-workers-launch

    对reassign-executors中的每个new_work_id调用launch-worker

    最终调用wait-for-workers-launch, 等待worder被成功launch

    逻辑也比较简单, check hb, 如果没有就不停的sleep, 至到超时, 打印failed to start

    (defn- wait-for-worker-launch [conf id start-time] (let [state (worker-state conf id)] (loop [] (let [hb (.get state LS-WORKER-HEARTBEAT)] (when (and (not hb) (< (- (current-time-secs) start-time) (conf SUPERVISOR-WORKER-START-TIMEOUT-SECS) )) (log-message id " still hasn't started") (Time/sleep 500) (recur) ))) (when-not (.get state LS-WORKER-HEARTBEAT) (log-message "Worker " id " failed to start") ))) (defn- wait-for-workers-launch [conf ids] (let [start-time (current-time-secs)] (doseq [id ids] (wait-for-worker-launch conf id start-time)) )) 本文章摘自博客园,原文发布日期:2013-06-28 相关资源:敏捷开发V1.0.pptx
    最新回复(0)