深入理解Spark:核心思想与源码分析. 3.12 Spark环境更新

    xiaoxiao2024-01-02  166

    3.12 Spark环境更新

    在SparkContext的初始化过程中,可能对其环境造成影响,所以需要更新环境,代码如下。

    postEnvironmentUpdate()

    postApplicationStart()

    SparkContext初始化过程中,如果设置了spark.jars属性, spark.jars指定的jar包将由addJar方法加入httpFileServer的jarDir变量指定的路径下。spark.files指定的文件将由addFile方法加入httpFileServer的fileDir变量指定的路径下。见代码清单3-49。

    代码清单3-49 依赖文件处理

    val jars: Seq[String] =

        conf.getOption("spark.jars").map(_.split(",")).map(_.filter(_.size != 0)).toSeq.flatten

     

    val files: Seq[String] =

        conf.getOption("spark.files").map(_.split(",")).map(_.filter(_.size != 0)).toSeq.flatten

     

    // Add each JAR given through the constructor

        if (jars != null) {

            jars.foreach(addJar)

        }

     

        if (files != null) {

            files.foreach(addFile)

    }

    httpFileServer的addFile和addJar方法,见代码清单3-50。

    代码清单3-50 HttpFileServer提供对依赖文件的访问

    def addFile(file: File) : String = {

        addFileToDir(file, fileDir)

        serverUri + "/files/" + file.getName

    }

     

    def addJar(file: File) : String = {

        addFileToDir(file, jarDir)

        serverUri + "/jars/" + file.getName

    }

     

    def addFileToDir(file: File, dir: File) : String = {

        if (file.isDirectory) {

            throw new IllegalArgumentException(s"$file cannot be a directory.")

        }

        Files.copy(file, new File(dir, file.getName))

        dir + "/" + file.getName

    }

    postEnvironmentUpdate的实现见代码清单3-51,其处理步骤如下:

    1)通过调用SparkEnv的方法environmentDetails最终影响环境的JVM参数、Spark 属性、系统属性、classPath等,参见代码清单3-52。

    2)生成事件SparkListenerEnvironmentUpdate,并post到listenerBus,此事件被Environ-mentListener监听,最终影响EnvironmentPage页面中的输出内容。

    代码清单3-51 postEnvironmentUpdate的实现

    private def postEnvironmentUpdate() {

        if (taskScheduler != null) {

            val schedulingMode = getSchedulingMode.toString

            val addedJarPaths = addedJars.keys.toSeq

            val addedFilePaths = addedFiles.keys.toSeq

            val environmentDetails =

                SparkEnv.environmentDetails(conf, schedulingMode, addedJarPaths, addedFilePaths)

            val environmentUpdate = SparkListenerEnvironmentUpdate(environmentDetails)

            listenerBus.post(environmentUpdate)

        }

    }

    代码清单3-52 environmentDetails的实现

    val jvmInformation = Seq(

        ("Java Version", s"$javaVersion ($javaVendor)"),

        ("Java Home", javaHome),

        ("Scala Version", versionString)

    ).sorted

     

    val schedulerMode =

        if (!conf.contains("spark.scheduler.mode")) {

            Seq(("spark.scheduler.mode", schedulingMode))

        } else {

            Seq[(String, String)]()

        }

    val sparkProperties = (conf.getAll ++ schedulerMode).sorted

     

    // System properties that are not java classpaths

    val systemProperties = Utils.getSystemProperties.toSeq

    val otherProperties = systemProperties.filter { case (k, _) =>

        k != "java.class.path" && !k.startsWith("spark.")

    }.sorted

     

    // Class paths including all added jars and files

    val classPathEntries = javaClassPath

        .split(File.pathSeparator)

        .filterNot(_.isEmpty)

        .map((_, "System Classpath"))

    val addedJarsAndFiles = (addedJars ++ addedFiles).map((_, "Added By User"))

    val classPaths = (addedJarsAndFiles ++ classPathEntries).sorted

     

    Map[String, Seq[(String, String)]](

        "JVM Information" -> jvmInformation,

        "Spark Properties" -> sparkProperties,

        "System Properties" -> otherProperties,

        "Classpath Entries" -> classPaths)

    }

    postApplicationStart方法很简单,只是向listenerBus发送了SparkListenerApplicationStart事件,代码如下。

    listenerBus.post(SparkListenerApplicationStart(appName, Some(applicationId), startTime, sparkUser))

    相关资源:敏捷开发V1.0.pptx
    最新回复(0)