Spark Streaming WordCount

    xiaoxiao2025-08-09  9

    import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.{Milliseconds, StreamingContext} import org.apache.spark.{SparkConf, SparkContext} /** * Created by zx on 2017/10/17. */ object SteamingWordCount { def main(args: Array[String]): Unit = { //离线任务是创建SparkContext,现在要实现实时计算,用StreamingContext val conf = new SparkConf().setAppName("SteamingWordCount").setMaster("local[2]") val sc = new SparkContext(conf) //StreamingContext是对SparkContext的包装,包了一层就增加了实时的功能 //第二个参数是小批次产生的时间间隔 val ssc = new StreamingContext(sc, Milliseconds(5000)) //有了StreamingContext,就可以创建SparkStreaming的抽象了DSteam //从一个socket端口中读取数据 //在Linux上用yum安装nc //yum install -y nc val lines: ReceiverInputDStream[String] = ssc.socketTextStream("192.168.1.207", 8888) //对DSteam进行操作,你操作这个抽象(代理,描述),就像操作一个本地的集合一样 //切分压平 val words: DStream[String] = lines.flatMap(_.split(" ")) //单词和一组合在一起 val wordAndOne: DStream[(String, Int)] = words.map((_, 1)) //聚合 val reduced: DStream[(String, Int)] = wordAndOne.reduceByKey(_+_) //打印结果(Action) reduced.print() //启动sparksteaming程序 ssc.start() //等待优雅的退出 ssc.awaitTermination() } }

     

    最新回复(0)