Spark学习之编程进阶——累加器与广播(5)
 
  1. Spark中两种类型的共享变量:累加器(accumulator)与广播变量(broadcast variable)。累加器对信息进行聚合,而广播变量用来高效分发较大的对象。
 
  2. 共享变量是一种可以在Spark任务中使用的特殊类型的变量。
 
  3. 累加器的用法:
 
  通过在驱动器中调用SparkContext.accumulator(initialValue)方法,创建出存有初始值的累加器。返回值为org.apache.spark.Accumlator[T]对象,其中T是初始值initialValue的类型。Spark闭包里的执行器代码可以使用累加器的+=方法(在Java中是add)增加累加器的值。 
驱动器程序可以调用累加器的value属性(在Java中使用value()或setValue()来访问累加器的值。
 Python中实现累加空行
 
      file = sc.textFile(inputFile)
    
    blankLines = sc.accumulator(
0)
    
def extractCallSigns(Line):
        globle blankLines 
        
if (line == 
""):
            blankLines += 
1
            return line.split(
"")
    callSigns = file.flatMap(extractCallSigns)
    callSigns.saveAsTextFile(outputDir + 
"/callsigns")
    
print "Blank lines:%d" % blankLines.value 
  
4. Spark的广播变量,它可以让程序高效地向所有工作节点发送一个较大的只读值,以供一个或多个Spark操作使用。
 
  Scala代码使用广播变量查询国家
 
      
//查询RDD contactCounts中的呼号的对应位置。将呼号前缀
    
//读取为国家代码进行查询
    val signPrefixes = sc.broadcast(loadCallSignTable())
    val countryContactCounts = contactCounts.map{
case (sign,count) =>
        val country = lookupInArray(sign,signPrefixes.value)
        (country,count)
        }.reduceByKey
((x,y) => x+y)
        countryContactCounts.saveAsTextFile(outputDir + "/countries.text") 
  5. Spark在RDD上提供pipe()方法。Spark的pipe()方法可以让我们使用任意一种语言实现Spark作业中的部分逻辑,只要它的读写Unix标准流就行。
 
  
                
        
 
相关资源:spark中用scala编写累加器小程序统计文章中空白行