深入理解Spark:核心思想与源码分析. 3.10 创建和启动ExecutorAllocationManager

    xiaoxiao2024-01-02  167

    3.10 创建和启动ExecutorAllocationManager

    ExecutorAllocationManager用于对已分配的Executor进行管理,创建和启动Executor-AllocationManager的代码如下。

    private[spark] val executorAllocationManager: Option[ExecutorAllocationManager] =

        if (conf.getBoolean("spark.dynamicAllocation.enabled", false)) {

            Some(new ExecutorAllocationManager(this, listenerBus, conf))

        } else {

            None

        }

    executorAllocationManager.foreach(_.start())

    默认情况下不会创建ExecutorAllocationManager,可以修改属性spark.dynamicAllocation.enabled为true来创建。ExecutorAllocationManager可以设置动态分配最小Executor数量、动态分配最大Executor数量、每个Executor可以运行的Task数量等配置信息,并对配置信息进行校验。start方法将ExecutorAllocationListener加入listenerBus中,ExecutorAllocationListener通过监听listenerBus里的事件,动态添加、删除Executor。并且通过Thread不断添加Executor,遍历Executor,将超时的Executor杀掉并移除。ExecutorAllocationListener的实现与其他SparkListener类似,不再赘述。ExecutorAllocationManager的关键代码见代码清单3-47。

    代码清单3-47 ExecutorAllocationManager的关键代码

    private val intervalMillis: Long = 100

    private var clock: Clock = new RealClock

    private val listener = new ExecutorAllocationListener

    def start(): Unit = {

        listenerBus.addListener(listener)

        startPolling()

    }

     

    private def startPolling(): Unit = {

        val t = new Thread {

            override def run(): Unit = {

                while (true) {

                    try {

                        schedule()

                    } catch {

                        case e: Exception => logError("Exception in dynamic executor allocation thread!", e)

                    }

                    Thread.sleep(intervalMillis)

                }

            }

        }

        t.setName("spark-dynamic-executor-allocation")

        t.setDaemon(true)

        t.start()

    }

    根据3.4.1节的内容,我们知道listenerBus内置了线程listenerThread,此线程不断从eventQueue中拉出事件对象,调用监听器的监听方法。要启动此线程,需要调用listenerBus的start方法,代码如下。

      listenerBus.start()

    最新回复(0)