本节书摘来自华章计算机《Spark与Hadoop大数据分析》一书中的第3章,第3.2节,作者:文卡特·安卡姆(Venkat Ankam) 更多章节内容可以访问云栖社区“华章计算机”公众号查看。
在本节,我们要了解 Spark 的核心概念。Spark 提供的主要抽象是弹性分布式数据集(Resilient Distributed Dataset,RDD)。因此,我们要了解 RDD 是什么,以及提供内存级性能和容错的 RDD 中包含的运算。但是,首先我们要学习使用 Spark 的方法。3.2.1 使用 Spark 的方法 使用 Spark 有两种方法,即 Spark Shell 和 Spark 应用程序。
Spark Shell 这是可以利用 Scala、Python 或 R 进行数据探索的交互式命令行(read-eval-print loop,REPL)功能:要获取 spark-shell 选项的完整列表,请使用以下命令:
探索 Spark Scala shellScala shell 提供了许多实用功能和 Tab 键自动补全,以方便使用。其中一些有用的实用功能如下面的例子所示。执行系统命令并检查返回码:
执行系统命令并检查输出:
在 shell 中粘贴 Spark 代码行:
要从 Scala shell 退出,请使用 :q 命令。通过传递一组要运行的命令,进入 Scala shell :
进入 Scala shell,执行命令,然后退出:
Python shell 不提供 Tab 键自动补全功能,但 iPython 笔记本提供了 Tab 键自动补全功能。
Spark 应用程序 Spark shell 用于开发和测试,而 Spark 应用程序用于创建和调度大规模数据处理应用程序的生产系统。创建应用程序可以利用本机支持的语言,如 Python、Scala、Java、SQL、R, 或利用流水线方法调用外部程序。spark-submit 用于提交 Spark 应用程序,如以下基于 Scala 的应用程序示例所示:用 spark-submit 也可以提交基于 Python 的应用程序,如下所示:
连接到启用了 Kerberos 安全的 Spark 集群用户可以使用 kinit 命令获取 Kerberos 票据(ticket)来使用 shell 或应用程序。对于提交Spark作业的应用程序,可使用 klist 命令显示Spark的keytab里的principal,如下所示:
请使用带有 keytab 和 principal 参数的 kinit 命令来获取票据如下所示:
一旦获取票据之后,应用程序就应该能够利用 shell 或应用程序连接到 Spark 集群。在使用 YARN 作为资源管理器时,你也可以通过 spark-submit 命令传递 --keytab 和 --principal 选项。3.2.2 弹性分布式数据集 RDD 是 Spark 数据的基础单元,Spark 编程是围绕着在 RDD 上创建和执行操作来进行的。它们是跨集群进行分区的不可变集合(immutable collection),如果某个分区丢失,这些分区可以重建(重新计算)。它们是使用数据流运算符(map、filter、group-by)在稳定存储中通过对数据进行变换而创建的,并且可以在并行运算架构的内存中进行缓存:弹性:如果内存中的数据丢失,它可以重新创建(或重新计算)分布式:在集群中进行分布数据集:初始数据可以来自文件或以编程方式创建创建 RDD 有两种方法:并行化(parallelize)或从文件读取。
方法1:并行化一个集合取出内存中已有的一个数据集合,将其传递给 SparkContext 的并行化方法。这种方法通常不用于生产系统,但可以用于原型设计和测试,因为它需要一台机器上的内存中的整个数据集。下面是使用并行方法创建 RDD 的一些示例:
方法 2:从一个文件中读取第二种方法是从 HDFS、S3、HBase、Avro、Parquet 文件和Hadoop支持的其他格式读取数据。输入可以是文件或目录。通配符也是支持的。下面的例子通过读取一个输入文件来创建一个名为 inputRDD 的 RDD:
从 HDFS 读取文件 从 HDFS 读取文件有两种方法。第一种方法是用管理节点主机名和 RPC 端口号来指定 HDFS 的 URI:
第二种方法是在 spark-env.sh 中设置和指定 HADOOP_CONF_DIR 环境变量,并直接指定路径:
HADOOP_CONF_DIR 可以在 /etc/spark/conf/spark-env.sh 中设置,也可以将 Hadoop 配置文件添加到 Spark 配置目录中。SparkContext.textFile 会调用 org.apache.hadoop.mapred.FileInputFormat.getSplits,它再接着调用 org.apache.hadoop.fs.getDefaultUri。此方法会读取 Hadoop 的 conf (core-site.xml) 里的 fs.defaultFS 参数。从启用了高可用性的 HDFS 读取文件当启用了管理节点的高可用性(High Availability,HA)时,不要在 URI 中使用活动的管理节点主机名,因为在出现任何故障的情况下,HA 会将活动的管理节点切换到备用管理节点。因此,要使用来自 core-site.xml 的属性名 fs.defaultFS 中的值。示例:hdfs://Nameservice1:8020/ 。**3.2.3 Spark 环境每个Spark应用程序都需要一个 Spark 环境,这是 Spark RDD API 的主要入口点。Spark shell 提供了一个名为“sc”的预配置 Spark 环境和一个名为“spark”的预配置 Spark 会话,如图3-1所示。
图3-1 Spark shell 里的 Spark 环境“sc”在 Spark 应用程序中,必须首先创建 SparkContext,如下面的 Scala 代码所示。为了创建 SparkContext 对象,就要构建一个含有该应用程序相关信息的 SparkConf 对象。每个应用程序只会创建一个 SparkContext:
3.2.4 变换和动作RDD 操作有两种类型:变换(transformation)和动作(action)。变换会根据当前的 RDD 来定义新的 RDD。动作则是从 RDD 返回值。让我们通过可视化的方式来看一些变换和动作的例子,从而更深刻地理解它们。图3-2 显示了如何读取一个示例文本文件(使用 Python 代码),用它来创建一个名为 fileRDD 的基础 RDD,并使用“map”变换来进行变换,创建出 upperRDD。最后,upperRDD 用“filter”变换进行过滤,并通过“collect”动作生成输出。在出现一个动作之前,变换并不会导致任何操作的执行。因此,变换会被延迟评估,动作负责启动作业来执行所有变换。在本示例中,由于“collect”操作,实际的作业执行从第 3 行开始。
图3-2 利用 Python 进行 Map 和 Filter 变换在前面的例子中,我们已经看到了数据是如何变换的,以及通过动作才能产生结果。让我们借助 Python 语言,通过一个日志分析(Log Analytics)示例来深入了解在内部究竟发生了什么:
图3-3 讲解了 Log Analytics 日志分析示例。
图3-3 Log Analytics工作流Log Analytics示例从具有3个块(B1、B2和B3)的HDFS目录读取文件,并创建名为access_log的基础 RDD。然后这个基础 RDD 会被过滤,从而创建包含ERROR记录的 error_log RDD。error_log RDD 会在内存中缓存,缓存的 RDD 名为 cached_log。对 cached_log RDD会执行两个动作。由于动作会导致变换的执行,第一个动作会创建 access_log、error_log 和 cached_log,然后将结果发送到客户端。第二个动作不会创建 access_log、error_log 和 cached_log。它将直接从 cached_log 读取数据。所以,第二个动作的性能会比第一个动作好得多。如果要在同一 RDD 上执行多个动作,那么缓存数据总是值得鼓励的。缓存可以在程序中的任何点进行。例如,可以在创建基准 RDD access_log 之后直接进行缓存。但是,在这种情况下,你将在缓存中存储大量的数据。所以,我们总是建议先过滤掉不需要的数据,然后再进行缓存。未缓存的 RDD 会由垃圾回收处理。因此,在 Log Analytics 示例中,access_log 和 error_log RDD 会由垃圾回收处理。可用的 RDD 变换和动作的列表可以在以下链接中看到:Python:https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD Scala:https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD Java:http://spark.apache.org/docs/latest/api/java/org/apache/spark/api/java/JavaRDD.html 3.2.5 RDD 中的并行度RDD 中的并行度由 spark.default.parallelism 参数控制。它默认为执行进程上的 CPU 内核总数或 2(以较大者为准)。让我们参考下面的例子来理解这个概念。使用 pyspark 命令进入 python shell,然后检查默认并行度,如下所示。假设你的集群上可用的默认 CPU 内核数量为 8:
但是,如果使用以下命令进入 Spark shell,则默认并行度将与分配的 CPU 内核数量相同:
让我们创建一个列表,把它并行化,并检查分区数:
此默认值与 sc.defaultParallelism 的值相同。为了覆盖默认的并行度,可以在创建 RDD 时提供所需的具体分区数。在这个例子中,让我们创建具有 6 个分区的 RDD:
让我们发出一个动作来计算列表中元素的数量:
getNumPartitions()方法显示 myRDD 中有 4 个分区。因此,对这个 RDD 的任何动作都需要 4 个任务。这意味着为了计算.count()动作,驱动进程JVM会将“计数代码”传送给在不同计算机上运行的 4 个任务(线程)。每个任务/线程只从一个分区读取和计数数据,并将结果发送回驱动进程 JVM。然后驱动进程将所有 4 个计数汇总为最终答案。这个过程可以通过查看 Spark 的管理界面来显示。Spark 的管理界面地址是:http://masterhostname:8080。单击 Running Applications下的应用程序 ID,然后单击Application Detail UI,它会带你转到 http://masterhostname:4040/jobs/ 这个管理界面。你可以看到为此动作创建了 4 个任务,如图3-4 所示。我们可以通过点击 Environment 和 Executors 选项卡来查看其他细节。Storage 选项卡会显示缓存的 RDD 及其缓存百分比和已缓存数据的大小。
图3-4 显示 4 个分区的 4 个任务的管理界面如果单击已完成的作业,你可以查看任务持续时间、执行进程 ID、主机名和其他详细信息,如图3-5 所示。
图3-5 显示任务持续时间的管理界面现在,mapPartitionsWithIndex() 变换使用了一个 lambda 函数,它接受一个分区索引(比如分区号)和一个迭代器(针对该特定分区中的元素)。对于输入的每一对“分区索引 + 迭代器”参数,lambda 函数会返回同一个分区索引号的元组以及该分区中实际元素的列表:
上面的结果解释了列表是如何分布在 RDD 分区上的。现在让我们增加分区的数量,看看数据如何重新分配到新 RDD 的分区。
现在,我们看到了有意思的情况:分区 0 和 2 是空的,而其他分区有数据。对这个 RDD 执行的任何动作将会有 6 个任务,其中任务 0 和任务 2 没有要处理的数据。因此,这将带来任务 0 和 2 的调度开销。现在,让我们尝试使用 coalesce() 函数来减少分区数:
coalesce 函数在减少分区数量方面非常有用,因为它不会导致混排。而重新分区会导致数据在集群中进行物理性的混排。请注意,在这个例子里,来自其他分区的数据把数据移动到了分区 0 和分区 1,而不是对所有分区进行混排。这个例子对于避免混排的代表性还不是太充分,但是有多个分区和更多数据元素的数据会清楚地表明数据混排是受限的。数量较大的分区或规模较小的分区具有更好的并行度,但是它们会产生调度和分发数据的开销。数量较小的分区或规模较大的分区有较低的调度和分发数据开销,但是它们对于大小不对称的分区会具有较低的并行度和较长的作业执行时间。良好的分区大小的合理范围为 100 MB~1 GB。当从 HDFS 读取一个文件时,Spark 会为 HDFS 的每个块创建一个分区。因此,如果某个 HDFS 文件有 8 个块,则创建的 RDD 将具有 8 个分区,如下所示。然而,可以通过指定所需的分区的数目来增加分区的数目。请注意,在这种情况下,分区数不能减少:
3.2.6 延迟评估 正如我们在日志分析示例中所见,对 RDD 的变换是延迟评估的,这样会优化 Spark 里的磁盘和内存占用。RDD 在创建时为空,只会确定类型和 IDSpark 要在看到一个动作的时候才会开始执行作业延迟评估的作用是,通过将操作组合在一起,从而减少它必须处理数据的次数在 MapReduce中,开发人员必须花费大量时间考虑如何将操作进行组合,从而最小化 MR 传递的次数如果某个任务生成的分区已经被缓存,那么该任务及其所有相关任务的大部分都可以省略
3.2.7 谱系图RDD 从不会在内存中复制。在机器故障的情况下,RDD 会利用谱系图(Lineage Graph)自动重建。RDD 在创建的时候会记住自己是如何构建的:是通过读取输入文件,还是通过变换其他 RDD 并利用它们来重建自己。这是一个基于有向无环图(Directed Acyclic Graph,DAG)的表示法,包含了它所有的依赖关系。在日志分析示例中,调用 toDebugString 函数,我们就可以找到 RDD 的谱系图。
结果显示了这个 RDD 是如何从它的父 RDD 构建出来的。如果 RDD 已经在内存或磁盘上进行了缓存,Spark的内部调度程序就可以截短 RDD 图的谱系。在这种情况下,Spark 可以“短路”,只根据已经持久化的 RDD 开始计算。第二种可能发生截短的情况是,由于之前混排的副作用,RDD 已经被物化了,即使它没有被显式地缓存。这是一种隐含的优化手段,它所利用的事实是 Spark 的混排输出会写入磁盘,并且 RDD 图的各部分会被多次重新计算。我们可以将 spark.shuffle.spill 参数设置为 false,从而避免这种将数据分散到磁盘的情况。RDD 是一种具有以下信息的接口。对于谱系:一组分区(类似于 Hadoop 中的拆分)父 RDD 的依赖性列表(谱系图)根据父 RDD 计算一个分区的函数对于优化的执行:可选的首选位置可选的分区信息(分区程序)第 2 点中的依赖关系可以或窄或宽。在窄依赖性中,父 RDD 的每个分区最多可以由子 RDD 的一个分区使用。在宽依赖性中,多个子分区都可以依赖于某个父 RDD 分区。图3-6 显示了窄和宽依赖性。
图3-6 窄变换与宽变换3.2.8 序列化 从驱动进程发送到执行进程的每个任务和在执行进程之间发送的数据都要序列化。使用合适的序列化框架来获得应用程序的最佳性能是非常重要的。Spark 提供了两个序列化库:Java 序列化和 Kryo 序列化。虽然 Java 序列化比较灵活,但它比较慢,会产生较大的序列化对象。Kryo 序列化大多用于想要更高的性能和更好的紧凑性。Kryo 序列化可以在 scala 应用程序中设置为:
它也可以从命令行指定为:
在 PySpark 里,Kryo 序列化没有用处,因为 PySpark 会把数据存储为字节对象。如果数据按 Hadoop 序列化格式的序列文件、AVRO、PARQUET 或协议缓冲区进行了序列化,Spark 会提供内置机制来读取和写入这些序列化格式的数据。利用 hadoopRDD 和 newAPIHadoopRDD 方法,可以读取任何 Hadoop 支持的 Inputformat。使用 saveAsHadoopDataset 和 saveAs-NewAPIHadoopDataset,输出可以写为任意的 Outputformat。Spark SQL 还支持所有支持 Hive 的存储格式(SerDes),以便从序列文件、Avro、ORC、Parquet 和协议缓冲区直接读取数据。3.2.9 在 Spark 中利用 Hadoop 文件格式 Spark 是利用 InputFormat 和 OutputFormat 的标准 Hadoop 库构建的。InputFormat 和 OutputFormat 是用来从 HDFS 中读取数据或在 MapReduce 程序中将数据写入 HDFS 的 Java API。Spark 天然支持这种特性,即使 Spark 没有运行在 Hadoop 集群上。除了文本文件之外,Spark 的 API 还支持其他几种数据格式:sc.wholeTextFiles:它会从 HDFS 目录中读取一小批文件作为键值对,其中文件名作为键,文件内容作为值。sc.sequenceFile:它会读取一个 Hadoop 序列文件。在 Python 中创建序列文件的一个示例如下:
现在你可以通过以下方式查看 Hadoop 中的内容:
在 Python 中读取一个序列文件的示例如下:
其他 Hadoop 格式有:hadoopFile:它会从 HDFS 读取一个具有任意键和值类的“旧”Hadoop InputFormat:
newAPIHadoopFile:它会从 HDFS 读取一个带有任意键和值类的“新API”的 Hadoop InputFormat:
在 Spark 程序中设置 Hadoop 配置属性的另一种方法如下所示:Scala:
Python:
要利用任意 Hadoop outputformat 输出数据,可以使用 saveAsHadoopFile 和 saveAsNew-APIHadoopFile 类。除了 hadoopFile 和 newAPIHadoopFile 之外,还可以使用 hadoopDataset 和 newAPIHadoop-Dataset 来读取(以及 saveAsHadoopDataset 和 saveAsNewAPIHadoopDataset用于写入)专用的 Hadoop 支持的数据格式的数据。hadoopDataset系列函数只需要一个配置对象,你可以在它上面设置访问数据源所需的 Hadoop 属性。它的配置方式与配置 Hadoop MapReduce 作业相同,因此你可以按照访问 MapReduce 其中一种数据源的操作指示来操作,然后将对象传递给 Spark:RDD.saveAsObjectFile 和 sc.objectFile:它们用来在 Java 或 Scala 中把 RDD 保存为序列化的 Java对象。在Python中,saveAsPickleFile 和 pickleFile方法是与pickle 序列化库一起使用的。3.2.10 数据的本地性 就像在 MapReduce 中一样,在运行 Spark 作业时,数据本地性(data locality)在改善性能方面起着至关重要的作用。数据本地性决定了数据存储的位置离处理它的代码有多近。与发送数据相比,将序列化的代码从一个地方发送到另一个地方会更快,因为代码的大小远小于数据。基于数据的当前位置,有几个级别的本地性。按顺序,从最近到最远如下表所示:
每个级别之间的撤回操作等待超时可以使用 spark.locality.wait 参数进行配置,默认是 3 秒。数据的本地性可以很容易从管理界面中检查,如图3-7 所示。这个例子表示该作业中的所有任务都是在本地处理的:
图3-7 数据本地性级别从 HDFS 读取文件时,Spark 会从 HDFS 获取区块位置,并尝试在区块存储的节点上分配任务。但是要注意, Spark 会先创建执行进程,然后运行任务。如果在所有节点上都有执行进程在运行,Spark 就能够在存储 HDFS 块的那些节点上分配任务。如果只分配了几个执行进程,就不容易获得数据的本地性。这就是使用 Spark 的 Standalone 资源管理器时的表现。当使用 YARN 资源管理器时,执行进程会被布置在存储 HDFS 区块的节点上。这是通过代码块中的 getPreferredLocations 函数实现的。请参阅以下链接:https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala 。3.2.11 共享变量 当 Spark 驱动进程将函数传递给执行进程时,每个函数都会使用单独的变量副本。因此,对于 4 个任务,会创建 4 个单独的变量,并且这些变量不会从执行进程 JVM 发送到驱动进程 JVM。虽然这样做很方便,但它也可能是低效的,因为默认的任务启动机制针对规模较小的任务进行了优化,这样我们就可能在多个并行任务中使用相同的变量。当驱动进程 JVM 向执行进程 JVM 发送大型对象(例如查找表)时,会出现性能问题。Spark 支持两种类型的共享变量。广播变量(broadcast variable):允许在每个工作机上缓存一个只读变量,而不是把它和每个任务搭配起来发送。因此,在处理 200 个任务的 20 个节点的集群中,只会创建 20 个广播变量,而不是 200 个。它相当于 MapReduce 术语中的分布式缓存。下面是一个在 PySpark shell 中使用广播变量的例子:
累加变量(accumulator):允许任务将数据写入一个共享的变量,而不是每个任务都有一个单独的变量。驱动进程可以访问累加变量的值。它相当于 MapReduce 术语中的计数器。下面是一个在 PySpark shell 中使用累加变量的例子:
3.2.12 键值对 RDD 针对含有键值对的RDD,Spark提供了特殊的变换和动作。这些 RDD 称为键值对 RDD。键值对 RDD 在许多 Spark 程序中非常有用,因为它们提供了支持对每个键进行并行操作或在网络上重新组合数据的变换和动作。例如,键值对 RDD 具有 reduceByKey 变换,可以针对每个键分别聚合数据,还有一个 join 变换,可以通过把键相同的元素进行组合,从而把两个 RDD 合并在一起。下面是在 Python 中创建一个键值对 RDD的示例,它以 word 作为键,长度作为值:
