深入理解Spark:核心思想与源码分析. 3.7 创建和启动DAGScheduler

    xiaoxiao2024-01-06  182

    3.7 创建和启动DAGScheduler

    DAGScheduler主要用于在任务正式交给TaskSchedulerImpl提交之前做一些准备工作,包括:创建Job,将DAG中的RDD划分到不同的Stage,提交Stage,等等。创建DAG-Scheduler的代码如下。

    @volatile private[spark] var dagScheduler: DAGScheduler = _

        dagScheduler = new DAGScheduler(this)

    DAGScheduler的数据结构主要维护jobId和stageId的关系、Stage、ActiveJob,以及缓存的RDD的partitions的位置信息,见代码清单3-32。

    代码清单3-32 DAGScheduler维护的数据结构

    private[scheduler] val nextJobId = new AtomicInteger(0)

    private[scheduler] def numTotalJobs: Int = nextJobId.get()

    private val nextStageId = new AtomicInteger(0)

     

    private[scheduler] val jobIdToStageIds = new HashMap[Int, HashSet[Int]]

    private[scheduler] val stageIdToStage = new HashMap[Int, Stage]

    private[scheduler] val shuffleToMapStage = new HashMap[Int, Stage]

    private[scheduler] val jobIdToActiveJob = new HashMap[Int, ActiveJob]

     

        // Stages we need to run whose parents aren't done

        private[scheduler] val waitingStages = new HashSet[Stage]

        // Stages we are running right now

        private[scheduler] val runningStages = new HashSet[Stage]

        // Stages that must be resubmitted due to fetch failures

        private[scheduler] val failedStages = new HashSet[Stage]

     

        private[scheduler] val activeJobs = new HashSet[ActiveJob]

     

        // Contains the locations that each RDD's partitions are cached on

        private val cacheLocs = new HashMap[Int, Array[Seq[TaskLocation]]]

        private val failedEpoch = new HashMap[String, Long]

     

        private val dagSchedulerActorSupervisor =

            env.actorSystem.actorOf(Props(new DAGSchedulerActorSupervisor(this)))

     

        private val closureSerializer = SparkEnv.get.closureSerializer.newInstance()

    在构造DAGScheduler的时候会调用initializeEventProcessActor方法创建DAGScheduler-EventProcessActor,见代码清单3-33。

    代码清单3-33 DAGSchedulerEventProcessActor的初始化

        private[scheduler] var eventProcessActor: ActorRef = _

    private def initializeEventProcessActor() {

            // blocking the thread until supervisor is started, which ensures eventProcess-Actor is

            // not null before any job is submitted

            implicit val timeout = Timeout(30 seconds)

            val initEventActorReply =

                dagSchedulerActorSupervisor ? Props(new DAGSchedulerEventProcessActor(this))

            eventProcessActor = Await.result(initEventActorReply, timeout.duration).

                asInstanceOf[ActorRef]

    }

     

    initializeEventProcessActor()

    这里的DAGSchedulerActorSupervisor主要作为DAGSchedulerEventProcessActor的监管者,负责生成DAGSchedulerEventProcessActor。从代码清单3-34可以看出,DAGScheduler-ActorSupervisor对于DAGSchedulerEventProcessActor采用了Akka的一对一监管策略。DAG-SchedulerActorSupervisor一旦生成DAGSchedulerEventProcessActor,并注册到ActorSystem,ActorSystem就会调用DAGSchedulerEventProcessActor的preStart,taskScheduler于是就持有了dagScheduler,见代码清单3-35。从代码清单3-35我们还看到DAG-SchedulerEventProcessActor所能处理的消息类型,比如JobSubmitted、BeginEvent、CompletionEvent等。DAGScheduler-EventProcessActor接受这些消息后会有不同的处理动作。在本章,读者只需要理解到这里即可,后面章节用到时会详细分析。

    代码清单3-34 DAGSchedulerActorSupervisor的监管策略

    private[scheduler] class DAGSchedulerActorSupervisor(dagScheduler: DAGScheduler)

        extends Actor with Logging {

     

        override val supervisorStrategy =

            OneForOneStrategy() {

                case x: Exception =>

                    logError("eventProcesserActor failed; shutting down SparkContext", x)

                    try {

                        dagScheduler.doCancelAllJobs()

                    } catch {

                        case t: Throwable => logError("DAGScheduler failed to cancel all jobs.", t)

                    }

                    dagScheduler.sc.stop()

                    Stop

        }

     

    def receive = {

            case p: Props => sender ! context.actorOf(p)

            case _ => logWarning("received unknown message in DAGSchedulerActorSupervisor")

        }

    }

    代码清单3-35 DAGSchedulerEventProcessActor的实现

    private[scheduler] class DAGSchedulerEventProcessActor(dagScheduler: DAGS-cheduler)

        extends Actor with Logging {

        override def preStart() {

            dagScheduler.taskScheduler.setDAGScheduler(dagScheduler)

        }

        /**

        * The main event loop of the DAG scheduler.

        */

        def receive = {

            case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) =>

                dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite,

                    listener, properties)

            case StageCancelled(stageId) =>

                dagScheduler.handleStageCancellation(stageId)

            case JobCancelled(jobId) =>

                dagScheduler.handleJobCancellation(jobId)

            case JobGroupCancelled(groupId) =>

                dagScheduler.handleJobGroupCancelled(groupId)

            case AllJobsCancelled =>

                dagScheduler.doCancelAllJobs()

            case ExecutorAdded(execId, host) =>

                dagScheduler.handleExecutorAdded(execId, host)

            case ExecutorLost(execId) =>

                dagScheduler.handleExecutorLost(execId, fetchFailed = false)

            case BeginEvent(task, taskInfo) =>

                dagScheduler.handleBeginEvent(task, taskInfo)

            case GettingResultEvent(taskInfo) =>

                dagScheduler.handleGetTaskResult(taskInfo)

            case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) =>

                dagScheduler.handleTaskCompletion(completion)

            case TaskSetFailed(taskSet, reason) =>

                dagScheduler.handleTaskSetFailed(taskSet, reason)

            case ResubmitFailedStages =>

                dagScheduler.resubmitFailedStages()

    }

    override def postStop() {

        // Cancel any active jobs in postStop hook

        dagScheduler.cleanUpAfterSchedulerStop()

    }

    相关资源:敏捷开发V1.0.pptx
    最新回复(0)