本节部分内容来自官方文档:http://spark.apache.org/docs/latest/streaming-programming-guide.html
DStream Transformation操作具体示例:
//读取本地文件~/streaming文件夹 val lines = ssc.textFileStream(args(0)) val words = lines.flatMap(_.split(" ")) val wordMap = words.map(x => (x, 1)) val wordCounts=wordMap.reduceByKey(_ + _) val filteredWordCounts=wordCounts.filter(_._2>1) val numOfCount=filteredWordCounts.count() val countByValue=words.countByValue() val union=words.union(word1) val transform=words.transform(x=>x.map(x=>(x,1))) //显式原文件 lines.print() //打印flatMap结果 words.print() //打印map结果 wordMap.print() //打印reduceByKey结果 wordCounts.print() //打印filter结果 filteredWordCounts.print() //打印count结果 numOfCount.print() //打印countByValue结果 countByValue.print() //打印union结果 union.print() //打印transform结果 transform.print()下面的代码是运行时添加的文件内容
root@sparkmaster:~/streaming# echo "A B C D" >> test12.txt; echo "A B" >> test12.txt下面是前面各个函数的结果
------------------------------------------- lines.print() ------------------------------------------- A B C D A B ------------------------------------------- flatMap结果 ------------------------------------------- A B C D A B ------------------------------------------- map结果 ------------------------------------------- (A,1) (B,1) (C,1) (D,1) (A,1) (B,1) ------------------------------------------- reduceByKey结果 ------------------------------------------- (B,2) (D,1) (A,2) (C,1) ------------------------------------------- filter结果 ------------------------------------------- (B,2) (A,2) ------------------------------------------- count结果 ------------------------------------------- 2 ------------------------------------------- countByValue结果 ------------------------------------------- (B,2) (D,1) (A,2) (C,1) ------------------------------------------- union结果 ------------------------------------------- A B C D A B A B C D ... ------------------------------------------- transform结果 ------------------------------------------- (A,1) (B,1) (C,1) (D,1) (A,1) (B,1)示例2: 上节课中演示的WordCount代码并没有只是对输入的单词进行分开计数,没有记录前一次计数的状态,如果想要连续地进行计数,则可以使用updateStateByKey方法来进行。下面的代码主要给大家演示如何updateStateByKey的方法,
import org.apache.spark.SparkConf import org.apache.spark.HashPartitioner import org.apache.spark.streaming._ object StatefulNetworkWordCount { def main(args: Array[String]) { if (args.length < 2) { System.err.println("Usage: StatefulNetworkWordCount <hostname> <port>") System.exit(1) } //函数字面量,输入的当前值与前一次的状态结果进行累加 val updateFunc = (values: Seq[Int], state: Option[Int]) => { val currentCount = values.sum val previousCount = state.getOrElse(0) Some(currentCount + previousCount) } //输入类型为K,V,S,返回值类型为K,S //V对应为带求和的值,S为前一次的状态 val newUpdateFunc = (iterator: Iterator[(String, Seq[Int], Option[Int])]) => { iterator.flatMap(t => updateFunc(t._2, t._3).map(s => (t._1, s))) } val sparkConf = new SparkConf().setAppName("StatefulNetworkWordCount").setMaster("local[4]") //每一秒处理一次 val ssc = new StreamingContext(sparkConf, Seconds(1)) //当前目录为checkpoint结果目录,后面会讲checkpoint在Spark Streaming中的应用 ssc.checkpoint(".") //RDD的初始化结果 val initialRDD = ssc.sparkContext.parallelize(List(("hello", 1), ("world", 1))) //使用Socket作为输入源,本例ip为localhost,端口为9999 val lines = ssc.socketTextStream(args(0), args(1).toInt) //flatMap操作 val words = lines.flatMap(_.split(" ")) //map操作 val wordDstream = words.map(x => (x, 1)) //updateStateByKey函数使用 val stateDstream = wordDstream.updateStateByKey[Int](newUpdateFunc, new HashPartitioner (ssc.sparkContext.defaultParallelism), true, initialRDD) stateDstream.print() ssc.start() ssc.awaitTermination() } }下图是初始时的值: 使用下列命令启动netcat server
root@sparkmaster:~/streaming# nc -lk 9999然后输入
root@sparkmaster:~/streaming# nc -lk 9999 hello将得到下图的结果
然后再输入world,
root@sparkmaster:~/streaming# nc -lk 9999 hello world则将得到下列结果
相关资源:实战:Spark Streaming实时流处理项目实战