深入理解Spark:核心思想与源码分析. 3.11 ContextCleaner的创建与启动

    xiaoxiao2024-01-02  159

    3.11 ContextCleaner的创建与启动

    ContextCleaner用于清理那些超出应用范围的RDD、ShuffleDependency和Broadcast对象。由于配置属性spark.cleaner.referenceTracking默认是true,所以会构造并启动ContextCleaner,代码如下。

    private[spark] val cleaner: Option[ContextCleaner] = {

        if (conf.getBoolean("spark.cleaner.referenceTracking", true)) {

            Some(new ContextCleaner(this))

        } else {

            None

        }

    }

    cleaner.foreach(_.start())

    ContextCleaner的组成如下:

    referenceQueue:缓存顶级的AnyRef引用;

    referenceBuffer:缓存AnyRef的虚引用;

    listeners:缓存清理工作的监听器数组;

    cleaningThread:用于具体清理工作的线程。

    ContextCleaner的工作原理和listenerBus一样,也采用监听器模式,由线程来处理,此线程实际只是调用keepCleaning方法。keepCleaning的实现见代码清单3-48。

    代码清单3-48 keep Cleaning的实现

    private def keepCleaning(): Unit = Utils.logUncaughtExceptions {

        while (!stopped) {

            try {

                val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT))

                    .map(_.asInstanceOf[CleanupTaskWeakReference])

                // Synchronize here to avoid being interrupted on stop()

                synchronized {

                    reference.map(_.task).foreach { task =>

                    logDebug("Got cleaning task " + task)

                    referenceBuffer -= reference.get

                    task match {

                        case CleanRDD(rddId) =>

                            doCleanupRDD(rddId, blocking = blockOnCleanupTasks)

                        case CleanShuffle(shuffleId) =>

                            doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks)

                        case CleanBroadcast(broadcastId) =>

                            doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)

                        }

                    }

                }

            } catch {

                case ie: InterruptedException if stopped => // ignore

                case e: Exception => logError("Error in cleaning thread", e)

            }

        }

    }

    相关资源:敏捷开发V1.0.pptx
    最新回复(0)