深入理解Spark:核心思想与源码分析. 3.6 创建任务调度器TaskScheduler

    xiaoxiao2024-01-09  153

    3.6 创建任务调度器TaskScheduler

    TaskScheduler也是SparkContext的重要组成部分,负责任务的提交,并且请求集群管理器对任务调度。TaskScheduler也可以看做任务调度的客户端。创建TaskScheduler的代码如下。

    private[spark] var (schedulerBackend, taskScheduler) =

        SparkContext.createTaskScheduler(this, master)

    createTaskScheduler方法会根据master的配置匹配部署模式,创建TaskSchedulerImpl,并生成不同的SchedulerBackend。本章为了使读者更容易理解Spark的初始化流程,故以local模式为例,其余模式将在第7章详解。master匹配local模式的代码如下。

    master match {

        case "local" =>

            val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)

            val backend = new LocalBackend(scheduler, 1)

            scheduler.initialize(backend)

            (backend, scheduler)

    3.6.1 创建TaskSchedulerImpl

    TaskSchedulerImpl的构造过程如下:

    1)从SparkConf中读取配置信息,包括每个任务分配的CPU数、调度模式(调度模式有FAIR和FIFO两种,默认为FIFO,可以修改属性spark.scheduler.mode来改变)等。

    2)创建TaskResultGetter,它的作用是通过线程池(Executors.newFixedThreadPool创建的,默认4个线程,线程名字以task-result-getter开头,线程工厂默认是Executors.default-ThreadFactory)对Worker上的Executor发送的Task的执行结果进行处理。

    TaskSchedulerImpl的实现见代码清单3-29。

    代码清单3-29 TaskSchedulerImpl的实现

    var dagScheduler: DAGScheduler = null

    var backend: SchedulerBackend = null

    val mapOutputTracker = SparkEnv.get.mapOutputTracker

    var schedulableBuilder: SchedulableBuilder = null

    var rootPool: Pool = null

    // default scheduler is FIFO

    private val schedulingModeConf = conf.get("spark.scheduler.mode", "FIFO")

    val schedulingMode: SchedulingMode = try {

        SchedulingMode.withName(schedulingModeConf.toUpperCase)

    } catch {

        case e: java.util.NoSuchElementException =>

            throw new SparkException(s"Unrecognized spark.scheduler.mode: $scheduling-ModeConf")

    }

     

    // This is a var so that we can reset it for testing purposes.

    private[spark] var taskResultGetter = new TaskResultGetter(sc.env, this)

    TaskSchedulerImpl的调度模式有FAIR和FIFO两种。任务的最终调度实际都是落实到接口SchedulerBackend的具体实现上的。为方便分析,我们先来看看local模式中SchedulerBackend的实现LocalBackend。LocalBackend依赖于LocalActor与ActorSystem进行消息通信。LocalBackend的实现参见代码清单3-30。

    代码清单3-30 LocalBackend的实现

    private[spark] class LocalBackend(scheduler: TaskSchedulerImpl, val totalCores: Int)

        extends SchedulerBackend with ExecutorBackend {

     

        private val appId = "local-" + System.currentTimeMillis

        var localActor: ActorRef = null

     

        override def start() {

            localActor = SparkEnv.get.actorSystem.actorOf(

                Props(new LocalActor(scheduler, this, totalCores)),

                "LocalBackendActor")

        }

     

        override def stop() {

            localActor ! StopExecutor

        }

     

        override def reviveOffers() {

            localActor ! ReviveOffers

        }

     

        override def defaultParallelism() =

            scheduler.conf.getInt("spark.default.parallelism", totalCores)

     

        override def killTask(taskId: Long, executorId: String, interruptThread: Boolean) {

            localActor ! KillTask(taskId, interruptThread)

        }

     

        override def statusUpdate(taskId: Long, state: TaskState, serializedData: ByteBuffer) {

            localActor ! StatusUpdate(taskId, state, serializedData)

        }

     

        override def applicationId(): String = appId

    }

    3.6.2 TaskSchedulerImpl的初始化

    创建完TaskSchedulerImpl和LocalBackend后,对TaskSchedulerImpl调用方法initialize进行初始化。以默认的FIFO调度为例,TaskSchedulerImpl的初始化过程如下:

    1)使TaskSchedulerImpl持有LocalBackend的引用。

    2)创建Pool,Pool中缓存了调度队列、调度算法及TaskSetManager集合等信息。

    3)创建FIFOSchedulableBuilder,FIFOSchedulableBuilder用来操作Pool中的调度队列。

    initialize方法的实现见代码清单3-31。

    代码清单3-31 TaskSchedulerImpl的初始化

    def initialize(backend: SchedulerBackend) {

        this.backend = backend

        rootPool = new Pool("", schedulingMode, 0, 0)

        schedulableBuilder = {

            schedulingMode match {

                case SchedulingMode.FIFO =>

                    new FIFOSchedulableBuilder(rootPool)

                case SchedulingMode.FAIR =>

                    new FairSchedulableBuilder(rootPool, conf)

            }

        }

        schedulableBuilder.buildPools()

    }

    相关资源:Spark内核设计的艺术架构设计与实现(耿嘉安)
    最新回复(0)