深入理解Spark:核心思想与源码分析. 3.3 创建metadataCleaner

    xiaoxiao2024-01-14  156

    3.3 创建metadataCleaner

    SparkContext为了保持对所有持久化的RDD的跟踪,使用类型是TimeStamped-WeakValueHashMap的persistentRdds缓存。metadataCleaner的功能是清除过期的持久化RDD。创建metadataCleaner的代码如下。

    private[spark] val persistentRdds = new TimeStampedWeakValueHashMap[Int, RDD[_]]

    private[spark] val metadataCleaner =

        new MetadataCleaner(MetadataCleanerType.SPARK_CONTEXT, this.cleanup, conf)

    我们仔细看看MetadataCleaner的实现,见代码清单3-14。

    代码清单3-14 MetadataCleaner的实现

    private[spark] class MetadataCleaner(

            cleanerType: MetadataCleanerType.MetadataCleanerType,

            cleanupFunc: (Long) => Unit,

            conf: SparkConf)

        extends Logging

    {

        val name = cleanerType.toString

     

        private val delaySeconds = MetadataCleaner.getDelaySeconds(conf, cleanerType)

        private val periodSeconds = math.max(10, delaySeconds / 10)

        private val timer = new Timer(name + " cleanup timer", true)

     

        private val task = new TimerTask {

            override def run() {

            try {

                cleanupFunc(System.currentTimeMillis() - (delaySeconds * 1000))

                logInfo("Ran metadata cleaner for " + name)

            } catch {

                case e: Exception => logError("Error running cleanup task for " + name, e)

            }

          }

        }

     

        if (delaySeconds > 0) {

            timer.schedule(task, delaySeconds * 1000, periodSeconds * 1000)

        }

     

        def cancel() {

            timer.cancel()

        }

    }

    从MetadataCleaner的实现可以看出其实质是一个用TimerTask实现的定时器,不断调用cleanupFunc: (Long) => Unit这样的函数参数。构造metadataCleaner时的函数参数是cleanup,用于清理persistentRdds中的过期内容,代码如下。

    private[spark] def cleanup(cleanupTime: Long) {

        persistentRdds.clearOldValues(cleanupTime)

    }

    最新回复(0)