深入理解Spark:核心思想与源码分析. 3.5 Hadoop相关配置及Executor环境变量

    xiaoxiao2024-01-10  170

    3.5 Hadoop相关配置及Executor环境变量

    3.5.1 Hadoop相关配置信息

    默认情况下,Spark使用HDFS作为分布式文件系统,所以需要获取Hadoop相关配置信息的代码如下。

    val hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(conf)

    获取的配置信息包括:

    将Amazon S3文件系统的AccessKeyId和SecretAccessKey加载到Hadoop的Configuration;

    将SparkConf中所有以spark.hadoop.开头的属性都复制到Hadoop的Configuration;

    将SparkConf的属性spark.buffer.size复制为Hadoop的Configuration的配置io.file.buffer.size。

    如果指定了SPARK_YARN_MODE属性,则会使用YarnSparkHadoopUtil,否则默认为SparkHadoopUtil。

    3.5.2 Executor环境变量

    对Executor的环境变量的处理,参见代码清单3-28。executorEnvs 包含的环境变量将会在7.2.2节中介绍的注册应用的过程中发送给Master,Master给Worker发送调度后,Worker最终使用executorEnvs提供的信息启动Executor。可以通过配置spark.executor.memory指定Executor占用的内存大小,也可以配置系统变量SPARK_EXECUTOR_MEMORY或者SPARK_MEM对其大小进行设置。

    代码清单3-28 Executor环境变量的处理

    private[spark] val executorMemory = conf.getOption("spark.executor.memory")

            .orElse(Option(System.getenv("SPARK_EXECUTOR_MEMORY")))

            .orElse(Option(System.getenv("SPARK_MEM")).map(warnSparkMem))

            .map(Utils.memoryStringToMb)

            .getOrElse(512)

     

        // Environment variables to pass to our executors.

        private[spark] val executorEnvs = HashMap[String, String]()

     

        for { (envKey, propKey) <- Seq(("SPARK_TESTING", "spark.testing"))

            value <- Option(System.getenv(envKey)).orElse(Option(System.getProperty (propKey)))} {

            executorEnvs(envKey) = value

        }

        Option(System.getenv("SPARK_PREPEND_CLASSES")).foreach { v =>

            executorEnvs("SPARK_PREPEND_CLASSES") = v

        }

        // The Mesos scheduler backend relies on this environment variable to set executor memory.

      executorEnvs("SPARK_EXECUTOR_MEMORY") = executorMemory + "m"

        executorEnvs ++= conf.getExecutorEnv

     

        // Set SPARK_USER for user who is running SparkContext.

        val sparkUser = Option {

            Option(System.getenv("SPARK_USER")).getOrElse(System.getProperty("user.name"))

        }.getOrElse {

            SparkContext.SPARK_UNKNOWN_USER

        }

        executorEnvs("SPARK_USER") = sparkUser

    相关资源:敏捷开发V1.0.pptx
    最新回复(0)