Spark Streaming之:二、基本输入源

    xiaoxiao2022-07-13  169

    Spark Streaming之:二、基本输入源


    文章目录

    Spark Streaming之:二、基本输入源一、文本文件输入源(数据实时采集)1. 基本步骤2. Spark-shell操作3. IDEA程序4. spark提交流计算任务 二、socket输入源1.使用套接字流作为数据源2.编写程序3.打包上传执行 三、队列流输入源四、Kafka输入源


    —>Spark知识点总结导航<—


    一、文本文件输入源(数据实时采集)

    1. 基本步骤

    (启动MySQL、hive、spark集群,启动数据监听)

    a. 通过创建输入DStream来定义输入流;b. 通过对DStream应用转换操作来定义流计算;c. 用StreamingContext.start()来开始接收数据和处理数据;d. 通过StreamingContext.awaitTermination()方法来等待处理结束;e. 可以通过StreamingContext.stop()来手动结束流计算进程。

    2. Spark-shell操作

    a. 引包

    import org.apache.spark.streaming._

    b. 设置间隔时间

    val ssc=new StreamingContext(sc,Seconds(20))

    c. 读取文件

    val lines=ssc.textFileStream("/home/duck/spark")

    d. 按格式提取数据

    val values=lines.flatMap(_.split(" "))

    val words=values.map(x=>(x,1)).reduceByKey(_+_)

    e. 启动并查看结果

    words.print() ssc.start() //启动流计算

    ssc.awaitTermination() ssc.stop() //手动结束

    3. IDEA程序

    import org.apache.log4j.{Level, Logger} import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.{Seconds, StreamingContext} object stream01 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[2]").setAppName("stream01") //创建Streaming程序的入口,间隔时间为20秒 val ssc = new StreamingContext(conf,Seconds(20)) //生成的日志文件清晰可读 Logger.getLogger("org.apache.spark").setLevel(Level.WARN) //获取文件数据 val lines:DStream[String] = ssc.textFileStream("/home/duck/spark") //切割数据 val words:DStream[String] = lines.flatMap(_.split(" ")) //将数据转为k-v形式 val values:DStream[(String,Int)] = words.map(t=>(t,1)) //wordcount汇总 val wordcount:DStream[(String,Int)] = values.reduceByKey(_+_) wordcount.print() ssc.start() ssc.awaitTermination() //ssc.stop() } }

    在控制台中没有什么结果显示,只能通过提交任务。

    4. spark提交流计算任务

    a. 写好程序代码 b. 打包上传 c. 提交spark任务

    bin/spark-submit \ --class com.spark.streaming.stream01 \ /home/duck/Lesson-1.0-SNAPSHOT.jar


    二、socket输入源

    1.使用套接字流作为数据源

    先检测有没有安装:

    nc -lk 9999

    未安装,需要在线安装:

    yum install nc.x86_64

    安装完成!

    2.编写程序

    package com.spark.streaming import org.apache.spark.streaming.dstream.ReceiverInputDStream import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.{SparkConf, SparkContext} object socket01 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[2]").setAppName("socket01") val ssc = new StreamingContext(conf,Seconds(20)) val lines:ReceiverInputDStream[String] = ssc.socketTextStream("Cloud00",9999) val words = lines.flatMap(_.split(" ")) val count = words.map(x=>(x,1)).reduceByKey(_+_) count.print() ssc.start() ssc.awaitTermination() } }

    3.打包上传执行

    ./bin/spark-submit \ --class com.spark.streaming.socket01 \ /home/duck/Lesson-1.0-SNAPSHOT.jar


    三、队列流输入源

    package com.spark.streaming import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD import org.apache.spark.streaming.{Seconds, StreamingContext} object QueueStream { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[2]").setAppName("test") val ssc = new StreamingContext(conf,Seconds(20)) //创建一个可变队列RDD val rddQueue = new scala.collection.mutable.SynchronizedQueue[RDD[Int]]() val queueStream = ssc.queueStream(rddQueue) val mappedStream = queueStream.map(r=>(r,1)) val reducedStream = mappedStream.reduceByKey(_+_) reducedStream.print() ssc.start() for (i<-1 to 10){ rddQueue += ssc.sparkContext.makeRDD(seq = 1 to 100,numSlices = 2) Thread.sleep(1000) } ssc.awaitTermination() ssc.stop() } }

    ***

    四、Kafka输入源

    Kafka之一:Kafka简述Kafka之二:Kafka集群的安装Kafka之三:Kafka集群工作流程Kafka之四:Kafka与Streaming集成
    --->有问题请联系QQ1436281495^_^
    最新回复(0)