在较高的层次上,每个Spark应用程序都包含一个驱动程序,该程序运行用户的main功能并在集群上执行各种并行操作。Spark提供的是弹性分布式数据集(RDD),它是跨群集节点分区的元素的集合,可以并行操作。RDD是通过从Hadoop文件系统(或任何其他Hadoop支持的文件系统)中的文件或驱动程序中现有Scala集合对其进行转换而创建的。用户还可以要求Spark 在内存中保留 RDD,允许它在并行操作中有效地重用。最后,RDD会自动从节点故障中恢复。
Spark可以在并行操作中使用的共享变量。默认情况下,当Spark并行运行一个函数作为不同节点上的一组任务时,它会将函数中使用的每个变量的副本发送给每个任务。有时,变量需要跨任务共享,或者在任务和驱动程序之间共享。Spark支持两种类型的共享变量:广播变量,可用于缓存所有节点的内存中的值; 累加器,例如计数器计数。
以下的例子是使用Scala语言进行介绍:
Spark 2.4.3构建和分发默认使用的是Scala 2.12。(Spark也可以与其他版本的Scala一起使用。)要在Scala中编写应用程序,您需要使用兼容的Scala版本(例如2.12.X)。
要编写Spark应用程序,需要在Spark上添加Maven依赖项。Spark可通过Maven Central获得:
groupId = org.apache.spark artifactId = spark-core_2.12 version = 2.4.3此外,如果您希望访问HDFS群集,则需要hadoop-client为您的HDFS版本添加依赖关系 。
groupId = org.apache.hadoop artifactId = hadoop-client version = <your-hdfs-version>最后,您需要将一些Spark类导入到您的程序中。添加以下行:
import org.apache.spark.SparkContext import org.apache.spark.SparkConf(在Spark 1.3.0之前,您需要明确import org.apache.spark.SparkContext._启用必要的隐式转换。)
Spark程序必须做的第一件事是创建一个SparkContext对象,它告诉Spark如何访问集群。要创建SparkContext首先需要构建一个包含有关应用程序信息的SparkConf对象。
每个JVM只能激活一个SparkContext。stop()在创建新的SparkContext之前,您必须使用它。
val conf = new SparkConf().setAppName(appName).setMaster(master) new SparkContext(conf)该appName参数是应用程序在集群UI上显示的名称。 master是Spark,Mesos或YARN群集URL,或以本地模式运行的特殊“local”字符串。实际上,当在群集上运行时,您不希望master在程序中进行硬编码,而是启动应用程序spark-submit并在那里接收它。但是,对于本地测试和单元测试,您可以传递“local”以在进程中运行Spark。
参见上一篇文章Spark 快速开始
Spark围绕弹性分布式数据集(RDD)的概念展开,RDD是一个可以并行操作的容错的容错集合。创建RDD有两种方法:并行化 驱动程序中的现有集合,或引用外部存储系统中的数据集,例如共享文件系统,HDFS,HBase或提供Hadoop InputFormat的任何数据源。
并行集合通过调用创建SparkContext的parallelize一个现有的收集方法,在你的驱动程序(a Scala Seq)。复制集合的元素以形成可以并行操作的分布式数据集。例如,以下创建包含数字1到5的并行化集合:
scala> val data = Array(1, 2, 3, 4, 5) data: Array[Int] = Array(1, 2, 3, 4, 5) scala> val distData = sc.parallelize(data) distData: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:26一旦创建,分布式数据集(distData)可以并行操作。例如,我们可能会调用distData.reduce((a, b) => a + b)来进行数据元素相加操作。我们稍后将描述对分布式数据集的操作。
并行集合的一个重要参数是将数据集切割为的分区数。Spark将为群集的每个分区运行一个任务。通常,您希望群集中的每个CPU有2-4个分区。通常,Spark会尝试根据您的群集自动设置分区数。但是,您也可以通过将其作为第二个参数传递给parallelize(例如sc.parallelize(data, 10))来手动设置。
Spark可以从Hadoop支持的任何存储源创建分布式数据集,包括本地文件系统,HDFS,Cassandra,HBase,Amazon S3等.Spark支持文本文件,SequenceFiles和任何其他Hadoop InputFormat。
通过使用SparkContext的textFile方法进行操作。此方法需要一个URI的文件(本地路径的机器上,或一个hdfs://,s3a://等URI),并读取其作为行的集合。这是一个示例调用:
scala> val distFile = sc.textFile("/soft/spark/README.md") distFile: org.apache.spark.rdd.RDD[String] = /soft/spark/README.md MapPartitionsRDD[2] at textFile at <console>:24创建后,distFile可以通过数据集操作执行操作。例如,我们可以使用map和reduce操作进行所有行的长度之和操作,如下所示:
scala> distFile.map(s => s.length).reduce((a, b) => a + b) res1: Int = 3847有关使用Spark读取文件的一些注意事项
如果在本地文件系统上使用路径,则还必须可以在工作节点上的相同路径上访问该文件。将文件复制到所有工作者或使用网络安装的共享文件系统。所有Spark的基于文件的输入方法,包括textFile支持在目录,压缩文件和通配符上运行。例如,你可以使用textFile("/my/directory"),textFile("/my/directory/.txt")和textFile("/my/directory/.gz")。该textFile方法还采用可选的第二个参数来控制文件的分区数。默认情况下,Spark为文件的每个块创建一个分区(HDFS中默认为128MB),但您也可以通过传递更大的值来请求更多的分区。请注意,您不能拥有比块少的分区。除文本文件外,Spark的Scala API还支持其他几种数据格式:
SparkContext.wholeTextFiles允许您读取包含多个小文本文件的目录,并将它们作为(文件名,内容)对返回。这与textFile不同,textFile将在每个文件中每行返回一条记录。而wholeTextFiles是每个文件为一条记录,分区由数据局部性决定,在某些情况下,可能导致分区太少。对于这些情况,wholeTextFiles提供可选的第二个参数来控制最小数量的分区。
对于SequenceFiles,使用SparkContext的sequenceFile[K, V]方法,其中K和V是文件中键和值的类型。这些是Hadoop的Writable接口的子类,如IntWritable和Text。此外,Spark允许您为一些常见的Writable指定类型; 例如,sequenceFile[Int, String]将自动读取IntWritables和文本。
对于其他Hadoop InputFormats,您可以使用该SparkContext.hadoopRDD方法,该方法采用任意JobConf输入格式类,键类和值类。设置这些与使用输入源的Hadoop作业的方式相同。您还可以使用SparkContext.newAPIHadoopRDD基于“new” MapReduce API(org.apache.hadoop.mapreduce)的InputFormats 。
RDD.saveAsObjectFile并SparkContext.objectFile支持以包含序列化Java对象的简单格式保存RDD。虽然这不像Avro这样的专用格式有效,但它提供了一种保存任何RDD的简便方法。
RDD支持两种类型的操作:transformations(从现有数据集创建新数据集)和actions(在数据集上运行计算后将值返回到驱动程序) 例如,map是一个转换,它通过一个函数传递每个数据集元素,并返回一个表示结果的新RDD。另一方面,reduce是一个使用某个函数聚合RDD的所有元素并将最终结果返回给驱动程序的动作. Spark中的所有转换都是惰性的,因为它们不会立即计算结果。相反,他们只记得应用于某些基础数据集的转换(例如文件)。仅当操作需要将结果返回到驱动程序时才会计算转换。这种设计使Spark能够更有效地运行。 默认情况下,每次对其执行操作时,都需要重新计算每个转换后的RDD,也可以使用persist 或者cache方法持久化RDD到内存中,以便在下次查询时更快地访问。还支持在磁盘上保留RDD或在多个节点上复制
为了说明RDD基础知识,请考虑以下简单程序:
val lines = sc.textFile("data.txt") val lineLengths = lines.map(s => s.length) val totalLength = lineLengths.reduce((a, b) => a + b)第一行定义来自外部文件的基本RDD。此数据集未加载到内存中:lines仅仅是指向文件的指针。 第二行定义lineLengths为map转换的结果。lineLengths 由于懒惰的性质,不是立即执行的。第三行我们运行reduce,这是一个动作。此时,Spark将计算分解为在不同机器上运行的任务,并且每台机器都运行其部分映射和本地聚合,返回其驱动程序的答案。 如果我们lineLengths以后想再次使用,我们可以添加:
lineLengths.persist()在reduce之前,这将lineLengths在第一次计算后保存在内存中。
Spark的一个难点是在跨集群执行代码时理解变量和方法的范围和生命周期。修改其范围之外的变量的RDD操作可能经常引起混淆。在下面的示例中,我们将查看foreach()用于递增计数器的代码,但其他操作也可能出现类似问题。
scala> val data = Array(1, 2, 3, 4, 5) data: Array[Int] = Array(1, 2, 3, 4, 5) scala> var counter = 0 counter: Int = 0 scala> var rdd = sc.parallelize(data) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[8] at parallelize at <console>:26 // Wrong: Don't do this!! scala> rdd.foreach(x => counter += x) scala> println("Counter value: " + counter) Counter value: 0上述代码的行为未定义,可能无法按预期工作。为了执行作业,Spark将RDD操作的处理分解为任务,每个任务都由执行程序执行。在执行之前,Spark计算任务的闭包。闭包是那些变量和方法,它们必须是可见的,以便执行者在RDD上执行计算(在这种情况下foreach())。该闭包被序列化并发送给每个执行者。
发送给每个执行程序的闭包内的变量现在是副本,因此,当在函数内引用计数器时foreach,它不再是驱动程序节点上的计数器。驱动程序节点的内存中仍然有一个计数器,但执行程序不再可见!执行程序只能看到序列化闭包中的副本。因此,计数器的最终值仍然为零,因为计数器上的所有操作都引用了序列化闭包内的值。
在本地模式下,在某些情况下,该foreach函数实际上将在与驱动程序相同的JVM中执行,并将引用相同的原始计数器,并且可能实际更新它。
为了确保在这些场景中明确定义的行为,应该使用Accumulator。Spark中的累加器专门用于提供一种机制,用于在跨集群中的工作节点拆分执行时安全地更新变量。本指南的“累加器”部分更详细地讨论了这些内容。
通常,闭包 - 类似循环或本地定义的方法的构造不应该用于改变某些全局状态。Spark没有定义或保证从闭包外部引用的对象的突变行为。执行此操作的某些代码可能在本地模式下工作,但这只是偶然的,并且此类代码在分布式模式下不会按预期运行。如果需要某些全局聚合,请使用累加器。
另一个常见的用法是尝试使用rdd.foreach(println)或rdd.map(println)打印RDD中的元素。在一台机器上,这将生成预期的输出并打印所有RDD的元素。但是,在cluster模式下,stdout执行程序调用的输出是现在写入执行stdout的程序,而不是驱动程序上的输出,因此stdout驱动程序不会显示这些!要打印驱动程序上的所有元素,可以使用该collect()方法首先将RDD收集到驱动程序节点:rdd.collect().foreach(println)。但是,这可能会导致驱动程序内存不足,因为collect()将整个RDD提取到一台机器上; 如果您只需要打印RDD的一些元素,更安全的方法是使用take():rdd.take(100).foreach(println)。
下表列出了Spark支持的一些常见转换。有关详细信息,请参阅RDD API文档(Scala, Java, Python, R),Pair RDD函数doc(Scala, Java)。
Transformation含义map(func)返回通过函数func传递源的每个元素形成的新分布式数据集。filter(func)返回通过选择func返回true 的源元素形成的新数据集。flatMap(func)与map类似,但每个输入项可以映射到0个或更多输出项(因此func应该返回Seq而不是单个项)。mapPartitions(func)与map类似,但在RDD的每个分区(块)上单独运行,因此当在类型T的RDD上运行时,func必须是Iterator <T> => Iterator <U>类型。mapPartitionsWithIndex(func)与mapPartitions类似,但也为func提供了表示分区索引的整数值,因此当在类型T的RDD上运行时,func必须是类型(Int,Iterator <T>)=> Iterator <U>。sample(withReplacement, fraction, seed)使用给定的随机数生成器种子,在有或没有替换的情况下对数据的一小部分进行采样。union(otherDataset)返回一个新数据集,其中包含源数据集和参数中元素的并集。intersection(otherDataset)返回包含源数据集和参数中元素交集的新RDD。distinct([numPartitions]))返回包含源数据集的不同元素的新数据集。groupByKey([numPartitions])在(K,V)对的数据集上调用时,返回(K,Iterable )对的数据集。注意:如果要进行分组以便对每个密钥执行聚合(例如总和或平均值),则使用reduceByKey或aggregateByKey将产生更好的性能。 注意:默认情况下,输出中的并行级别取决于父RDD的分区数。您可以传递可选numPartitions参数来设置不同数量的任务。reduceByKey(func, [numPartitions])当调用(K,V)对的数据集时,返回(K,V)对的数据集,其中使用给定的reduce函数func聚合每个键的值,该函数必须是类型(V,V)=> V.同样groupByKey,reduce任务的数量可通过可选的第二个参数进行配置。aggregateByKey(zeroValue)(seqOp, combOp, [numPartitions])当调用(K,V)对的数据集时,返回(K,U)对的数据集,其中使用给定的组合函数和中性“零”值聚合每个键的值。允许与输入值类型不同的聚合值类型,同时避免不必要的分配。同groupByKey,reduce任务的数量可通过可选的第二个参数进行配置。sortByKey([ascending], [numPartitions])当在K实现Ordered的(K,V)对的数据集上调用时,返回按键按升序或降序排序的(K,V)对的数据集,如布尔ascending参数中所指定的。join(otherDataset, [numPartitions])当调用类型(K,V)和(K,W)的数据集时,返回(K,(V,W))对的数据集以及每个键的所有元素对。外连接通过支持leftOuterJoin,rightOuterJoin和fullOuterJoin。cogroup(otherDataset, [numPartitions])当调用类型(K,V)和(K,W)的数据集时,返回(K,(Iterable <V>,Iterable <W>))元组的数据集。此操作也称为groupWith。cartesian(otherDataset)当调用类型为T和U的数据集时,返回(T,U)对的数据集(所有元素对)。pipe(command, [envVars])通过shell命令管道RDD的每个分区,例如Perl或bash脚本。RDD元素被写入进程的stdin,并且输出到其stdout的行将作为字符串的RDD返回。coalesce(numPartitions)将RDD中的分区数减少为numPartitions。过滤大型数据集后,可以更有效地运行操作。repartition(numPartitions)随机重shuffle RDD中的数据以创建更多或更少的分区并在它们之间进行平衡。这总是随机shuffles网络上的所有数据。repartitionAndSortWithinPartitions(partitioner)根据给定的分区重新分区RDD,并在每个生成的分区中按键对记录进行排序。这比repartition在每个分区中调用然后排序更有效,因为它可以将排序推送到shuffle机器中。下表列出了Spark支持的一些常见操作。请参阅RDD API文档(Scala, Java, Python, R) ,pair RDD函数doc(Scala, Java)详细信息。
Action含义reduce(func)使用函数func(它接受两个参数并返回一个)来聚合数据集的元素。该函数应该是可交换的和关联的,以便可以并行正确计算。collect()在驱动程序中将数据集的所有元素作为数组返回。在过滤器或其他返回足够小的数据子集的操作之后,这通常很有用。count()返回数据集中的元素数。first()返回数据集的第一个元素(类似于take(1))take(n)返回包含数据集的前n个元素的数组。takeSample(withReplacement, num, [seed])返回一个数组,其中包含数据集的num个元素的随机样本,有或没有替换,可选地预先指定随机数生成器种子。takeOrdered(n, [ordering])使用自然顺序或自定义比较器返回RDD 的前n个元素。saveAsTextFile(path)将数据集的元素作为文本文件(或文本文件集)写入本地文件系统,HDFS或任何其他Hadoop支持的文件系统的给定目录中。Spark将在每个元素上调用toString,将其转换为文件中的一行文本。saveAsSequenceFile(path) (Java and Scala)将数据集的元素作为Hadoop SequenceFile写入本地文件系统,HDFS或任何其他Hadoop支持的文件系统中的给定路径中。这可以在实现Hadoop的Writable接口的键值对的RDD上使用。在Scala中,它也可以在可隐式转换为Writable的类型上使用(Spark包括基本类型的转换,如Int,Double,String等)。saveAsObjectFile(path) (Java and Scala)使用Java序列化以简单格式编写数据集的元素,然后可以使用它进行加载 SparkContext.objectFile()。countByKey()仅适用于类型(K,V)的RDD。返回(K,Int)对的散列映射,其中包含每个键的计数。foreach(func)在数据集的每个元素上运行函数func。这通有副作用,例如更新累加器或与外部存储系交互。 注意:修改除累加器之外的变量foreach()可能会导致未定义的行为。有关详细信息,请参阅了解闭包。Spark RDD API还公开某些操作的异步版本,例如foreachAsync for foreach。
Spark中的某些操作会触发称为shuffle的事件。shuffle是Spark的重新分配数据的机制,因此它可以跨分区进行不同的分组。这通常涉及跨执行程序和机器复制数据,使得混洗成为复杂且昂贵的操作。
为了理解在Shuffle过程中发生的事情,我们可以考虑reduceByKey操作的例子 。该reduceByKey操作生成一个新的RDD,其中单个键的所有值都组合成一个元组 -问题在于,并非单个key的所有值都位于同一个分区,甚至是同一个机器上,但它们必须位于同一位置才能计算结果。所以 必须从所有分区读取以查找所有键的所有值,然后将分区中的值汇总在一起以计算每个键的最终结果 - 这称为shuffle。
如果shuffle之后想要数据是有序的可以使用以下方法:
mapPartitions 对每个分区进行排序 例如 .sortedrepartitionAndSortWithinPartitions 在重新分区的同时有效地对分区进行排序sortBy 制作全局有序的RDD可能导致shuffle操作的一些方法: repartition、coalesce、groupByKey、reduceByKey 、cogroup,join。
shuffle是昂贵的操作,因为它涉及的磁盘I / O,数据序列,和网络I / O。为了组织shuffle的数据,Spark生成了一系列任务 - 映射任务以组织数据,以及一组reduce任务来聚合它。这个术语来自MapReduce,并不直接与Spark map和reduce操作相关。
在内部,各个map任务的结果会保留在内存中,直到它们无法适应。然后,这些基于目标分区进行排序并写入单个文件。在reduce方面,任务读取相关的排序块。
某些shuffle操作会消耗大量的堆内存,因为它们使用内存中的数据结构来在传输记录之前或之后组织记录。具体来说,reduce bykey和aggregatebykey在映射端创建这些结构,而“bykey操作在reduce端生成这些结构。当数据不适合内存时,spark会将这些表溢出到磁盘上,从而导致磁盘I/O的额外开销和增加的垃圾收集。
Shuffle还会在磁盘上生成大量中间文件。从Spark 1.3开始,这些文件将被保留,直到不再使用相应的RDD并进行垃圾回收。这样做是为了在重新计算谱系时不需要重新创建shuffle文件。如果应用程序保留对这些RDD的引用或GC不经常启动,则垃圾收集可能仅在很长一段时间后才会发生。这意味着长时间运行的Spark作业可能会占用大量磁盘空间。spark.local.dir配置Spark上下文时,配置参数指定临时存储目录 。可以通过调整各种配置参数来调整shuffle行为。请参阅“ Spark配置指南 ”中的“随机行为”部分。
Spark中最重要的功能之一是跨操作在内存中持久化(或缓存)数据集。当您持久保存RDD时,每个节点都会存储它在内存中计算的任何分区,并在该数据集(或从中派生的数据集)的其他操作中重用它们。这使得未来的行动更快(通常超过10倍)。缓存是迭代算法和快速交互式使用的关键工具。
您可以使用persist()或cache()方法标记要保留的RDD 。第一次在动作中计算它,它将保留在节点的内存中。Spark的缓存是容错的 - 如果丢失了RDD的任何分区,它将使用最初创建它的转换自动重新计算。
此外,每个持久化RDD可以使用不同的存储级别进行存储,例如,允许您将数据集保留在磁盘上,将其保留在内存中,但作为序列化Java对象(以节省空间),跨节点复制它。通过传递StorageLevel对象(Scala, Java, Python)来设置这些级别 persist()。该cache()方法是使用默认存储级别的简写,即StorageLevel.MEMORY_ONLY(在内存中存储反序列化的对象)。完整的存储级别是:
存储级别含义MEMORY_ONLY将RDD存储为JVM中的反序列化Java对象。如果RDD不适合内存,则某些分区将不会被缓存,并且每次需要时都会重新计算。这是默认级别。MEMORY_AND_DISKStore RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, store the partitions that don’t fit on disk, and read them from there when they’re needed.MEMORY_ONLY_SER (Java和Scala)将RDD存储为序列化 Java对象(每个分区一个字节数组)。这通常比反序列化对象更节省空间,特别是在使用 快速序列化器时,但读取CPU密集程度更高。MEMORY_AND_DISK_SER (Java和Scala)MEMORY_ONLY_SER类似,但将不适合内存的分区溢出到磁盘,而不是每次需要时动态重新计算它们。DISK_ONLY仅将RDD分区存储在磁盘上。MEMORY_ONLY_2,MEMORY_AND_DISK_2等Same as the levels above, but replicate each partition on two cluster nodes.OFF_HEAP(实验性)Similar to MEMORY_ONLY_SER, but store the data in off-heap memory. This requires off-heap memory to be enabledSpark的存储级别旨在提供内存使用和CPU效率之间的不同折衷。我们建议您通过以下流程选择一个:
如果您的RDD与默认存储级别(MEMORY_ONLY)保持一致,请保持这种状态。这是CPU效率最高的选项,允许RDD上的操作尽可能快地运行。
如果没有,请尝试使用MEMORY_ONLY_SER并选择快速序列化库,以使对象更节省空间,但访问速度仍然相当快。(Java和Scala)
除非计算数据集的函数很昂贵,否则它们不会溢出到磁盘,或者它们会过滤大量数据。否则,重新计算分区可能与从磁盘读取分区一样快。
如果要快速故障恢复,请使用复制的存储级别(例如,如果使用Spark来处理来自Web应用程序的请求)。所有存储级别通过重新计算丢失的数据提供完全容错,但复制的存储级别允许您继续在RDD上运行任务,而无需等待重新计算丢失的分区。(Use the replicated storage levels if you want fast fault recovery (e.g. if using Spark to serve requests from a web application). All the storage levels provide full fault tolerance by recomputing lost data, but the replicated ones let you continue running tasks on the RDD without waiting to recompute a lost partition.)
Spark会自动监视每个节点上的缓存使用情况,并以最近最少使用(LRU)的方式删除旧数据分区。如果您想手动删除RDD而不是等待它退出缓存,请使用该RDD.unpersist()方法。
通常,当在远程集群节点上执行传递给Spark操作(例如mapor reduce)的函数时,它将在函数中使用的所有变量的单独副本上工作。这些变量将复制到每台计算机,并且远程计算机上的变量的更新不会传播回驱动程序。支持跨任务的通用,读写共享变量效率低下。但是,Spark确实为两种常见的使用模式提供了两种有限类型的共享变量:广播变量和累加器。
广播变量允许程序员在每台机器上保留一个只读变量,而不是随副本一起发送它的副本。例如,它们可用于以有效的方式为每个节点提供大输入数据集的副本。Spark还尝试使用有效的广播算法来分发广播变量,以降低通信成本。
Spark动作通过一组阶段执行,由分布式“shuffle”操作分隔。Spark自动广播每个阶段中任务所需的公共数据。以这种方式广播的数据以序列化形式缓存并在运行每个任务之前反序列化。这意味着显式创建广播变量仅在跨多个阶段的任务需要相同数据或以反序列化形式缓存数据很重要时才有用。
广播变量v :SparkContext.broadcast(v)。广播变量是一个包装器,可以通过调用该value 方法来访问它的值。代码如下:
scala> val broadcastVar = sc.broadcast(Array(1, 2, 3)) broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0) scala> broadcastVar.value res0: Array[Int] = Array(1, 2, 3)直接上例子:
scala> val accum = sc.longAccumulator("My Accumulator") accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(My Accumulator), value: 0) scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x)) ... 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s scala> accum.value res2: Long = 10虽然此代码使用了对Long类型的累加器的内置支持,但程序员也可以通过继承AccumulatorV2来创建自己的类型。AccumulatorV2抽象类有几个必须覆盖的方法:reset用于将累加器重置为零,add用于将另一个值添加到累加器中, merge用于将另一个相同类型的累加器合并到这个中。其他必须覆盖的方法包含在API文档中。例如,假设我们有一个MyVector表示数学向量的类,我们可以写:
class VectorAccumulatorV2 extends AccumulatorV2[MyVector, MyVector] { private val myVector: MyVector = MyVector.createZeroVector def reset(): Unit = { myVector.reset() } def add(v: MyVector): Unit = { myVector.add(v) } ... } // Then, create an Accumulator of this type: val myVectorAcc = new VectorAccumulatorV2 // Then, register it into spark context: sc.register(myVectorAcc, "MyVectorAcc1")