Spark Core:第四章 RDD

    xiaoxiao2025-06-14  16

    Spark Core:第四章 RDD


    文章目录

    Spark Core:第四章 RDD一、RDD简介1. RDD是spark的核心2. RDD基本原理3. RDD特性4. RDD的宽窄依赖 二、RDD的创建1. 从文件创建2. 通过集合创建3. 从其他RDD转化(转换算子)4. 创建指定分区的RDD5. 键值对RDD 三、RDD五大特性四、RDD的基本问题1.K,V格式的RDD是什么?2.sc.textFile(..)3.RDD的分布式体现在哪里?4.RDD的弹性体现在哪里? 五、RDD的操作算子1. 转换操作算子Transformation2. 行动操作算子action3. 持久化算子 六、RDD的编程应用1. 排序2. 多文件排序3. 二次排序4. spark中文件的读取(1) 读本地文件(2) 读Json(3) HBase中读取和写入(4) 从传统数据库中读 5. RDD键值对计算


    —>Spark知识点总结导航<—


    一、RDD简介

      RDD Resillient Distrabuted Dataset( 弹性分布式数据集),是分布式内存一种抽象概念,提供了一种高度受限的共享内存模型(不存数据)

    1. RDD是spark的核心

      spark数据集的表现形式

    partition是RDD最小数据单元,多个partition组成一个RDD

    2. RDD基本原理

     (1) 读入外部数据源进行创建(只读,不能修改)

     (2) 最后一个RDD经过“动作”操作进行转换,并输出到外部数据源

    操作:动作(Action)和转换(Transformation)优点:惰性调用、管道化、避免同步等待、不需要保存中间结果、每次操作变得简单

    3. RDD特性

     (1) 高效的容错性

       ① 容错机制:数据复制或记录日志

       ② 血缘关系、重新计算丢失分区、无需回滚系统、在不同节点间在重算过程中、记录粗粒度操作

     (2) 中间结果持久化到内存,数据在内存中的多个RDD操作之间进行传递,避免了不必要的读写磁盘开销

     (3) 存放的都是Java对象,避免了不必要的序列化和反序列化

    4. RDD的宽窄依赖

     (1) 窄依赖

       ① 一个父RDD对应一个子RDD的分区

       ② 多个父RDD对应一个子RDD的分区

      实现流水线优化,可以实现管道化

     (2) 宽依赖(shuffle)

       一个父RDD的一个分区对应一个子RDD的多个分区

      包含shuffle过程(磁盘读写,效率低),无法实现流水线优化


    二、RDD的创建

    1. 从文件创建

     (1) 本地文件

    sc.textFile("/home/duck/software/spark/RELEASE")

     (2) HDFS

    sc.textFile("hdfs://Cloud:9000/RELEASE")

     textFile转换算子,当我们执行上面语句的时候,没有真的去做读文件这个操作

    2. 通过集合创建

    val arr=Array(1,2,3,4,5) sc.textFile(...,minNumpartition) sc.parallelize(arr) sc.makeRDD(arr)

    3. 从其他RDD转化(转换算子)

    4. 创建指定分区的RDD

    sc.textFile("hdfs://Cloud:9000/RELEASE",1) 这里的1就是分区数 sc.parallelize(arr,1)

    5. 键值对RDD

     (1) RDD里面每一个元素都是一个k-v对

     (2) reduceByKey就是只针对键值对的RDD才能用的操作


    三、RDD五大特性

     1.RDD由一系列partition组成

     2.算子(函数)是作用在partition上的

     3.RDD之间有依赖关系

     4.分区器是作用在K,V格式的RDD上

     5.partition对外提供最佳的计算位置,利于数据处理的本地化


    四、RDD的基本问题

    1.K,V格式的RDD是什么?

      RDD中的元素是一个个的Tuple2(二元组)

    2.sc.textFile(…)

      底层调用的是MR读取HDFS的方法,首先也会split,一个split对应一个block,这里的split 也对应一个partition.

    3.RDD的分布式体现在哪里?

      RDD中的partition是分布在多个节点上的

    4.RDD的弹性体现在哪里?

       (1) partition的个数可多可少

       (2) RDD之间有依赖关系

       (3) RDD中是不存数据的


    五、RDD的操作算子

    1. 转换操作算子Transformation

      通过转换算子来对RDD进行操作,懒执行,需要Action算子(行动算子)触发执行

      (1) map:把我们RDD中每个元素都执行map里的(function),形成一个新的RDD,原来的RDD中的元素,与新RDD中的元素一对一 RDD2=RDD1.map(x=>x+2) 等价于RDD2=RDD1.map(_+2)

      (2) flatMap:把RDD中的每个元素都都执行flatMap里的(function),原来的RDD中的每个元素对应新的RDD中的一个或多个元素,那么原则上能用flatMap实现,用map+flatten都能实现

      (3) filter:把RDD中的每个元素都执行filter中的每个function,把结果是true的留下,生成新的RDD,也就是filter的function返回类型必须是Boolean res.filter(_,contains(“hadoop”)) res.collect()

      (4) reduceByKey:针对k-v对的RDD,把RDD中每个元素都按照相同key聚合,(聚合函数自己定义),每个key返回结果值。 reduceByKey((a,b)=>a-b)等价于reduceByKey(-)

      (5) groupByKey:针对k-v的RDD才能使用的算子,把相同key的value聚合成一个valuelist(聚合形式不能自己定义)

    groupByKey+map = reduceByKey (scala,CompactBuffer(1)) (spark,CompactBuffer(1, 1, 1)) (is,CompactBuffer(1, 1, 1)) (hadoop,CompactBuffer(1)) (fast,CompactBuffer(1)) (haoop,CompactBuffer(1)) (hello,CompactBuffer(1, 1, 1)) (good,CompactBuffer(1)) (better,CompactBuffer(1)) val unit1: RDD[(String, Iterable[Int])] = unit.map((_,1)).groupByKey() unit1.map(t=>(t._1,t._2.sum)).foreach(println)

      (6) join:针对k-v的RDD才能使用的算子,直返回两个RDD里面key相同的值,(key,(集合1key的value,集合2key的value))

    leftOutjoin rightOutjoin fullOutjoin sortByKey sortBy mapValue keys values .......

      textFile转换算子,当我们执行上面的语句时并没有真的去读文件的操作,直到我们遇到一个行动算子的时候,我们才真的去读,所以在读文件时生成一个RDD,即使文件不存在也不会报错

    2. 行动操作算子action

      Action算子触发Transformation类算子执行,一个application中有几个Action算子,就有几个job

      (1) foreach()   (2) count():长度(会将结果回收到Driver端)   (3) first:第一个元素   (4) take(num)

      (5) collect()(会将结果回收到Driver端)   (6) saveAsTextFile

      (7) reduce     把每个元素聚合,返回结果      …     first = take(1)

    3. 持久化算子

     (1) 持久化算子的特点

       ① 持久化算子不能直接在action算子之后,经过action算子以后,就不是RDD了

       ② 持久化算子是懒执行的

       ③ 取消持久化用unpersist()

     (2) 持久化策略的原则是

       尽量少用磁盘,尽量少用冗余备份的策略

     (3) 持久化算子的类型

       ① cache()默认将数据存在内存中        =persist()        =persist(StorageLevel.MEMORY_ONLY)     cache()把RDD放在内存

       ② persist()可以手动的指定数据持久化的级别

    persist(StorageLevel.MEMORY_ONLY) MEMORY_ONLY_SER MEMORY_AND_DISK MEMORY_AND_DISK_SER

       ③ checkPoint()可以将数据持久化到磁盘,还可以切断RDD之间的依赖关系

        a. checkPoint()落磁盘,而且进行checkPoint时,必须在sc里指定路径

        b. checkPoint()优化,chekPoint()之前最好做一下cache()

        c. 当lineage非常长,计算又复杂时,可以使用checkpoint对RDD进行持久化,当application执行完毕之后,checkpoint中的数据不会被清除

       ④ chekpoint的执行流程

        a. 当application有action触发执行时,job执行完成之后,会从后往前回溯

        b. 回溯去找有哪些RDD被checkpoint,被checkpoint的做标记

        c. 回溯完成之后,重新计算checkpointRDD的数据,将结果写入指定的checkpoint目录中

        d. 切断RDD的依赖关系

     (4) 总结     a. persist()等价于cache()放内存,当数据大小超过内存大小时,超出部分不放入内存,当进行数据读取时先读内存里的,然后没有持久化的数据,去文件读取

        b. cache和persist 的最小单位partition,都是懒执行,需要action算子触发执行

        c. 对一个RDD使用cache或者persist后,可以赋值给一个变量,下次直接使用这个变量就是使用的持久化的数据

        d. cache和persist之后不能紧跟action算子

        e. 当application执行完毕之后,持久化的数据会被清除


    六、RDD的编程应用

    1. 排序

    object RddAPP { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local").setAppName("TOPN") val sc = new SparkContext(conf) val lines: RDD[String] = sc.textFile("./people") var num = 0 val line: RDD[String] = lines.filter(line=>{ (line.trim().length>0) && (line.split(",").length==4) }) line.map(_.split(",")(2)) .map(x=>(x.toInt,"")) //降序 .sortByKey(false) .map(x=>x._1).take(5).foreach({ num = num + 1 println }) } }

    2. 多文件排序

    object RddAPP02 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local").setAppName("FileSort") val sc = new SparkContext(conf) val files: RDD[String] = sc.textFile("./doc") println(files.getNumPartitions) var index = 0 val result = files.filter(lines=>lines.trim.length>0) .map(line=>(line.trim.toInt,"")) .repartition(1).sortByKey().map(num=>{ index+=1 (index,num._1) }) result.saveAsTextFile("./doc/out") } }

    3. 二次排序

    object SecondarySortKey { def main(args: Array[String]): Unit = { val conf =new SparkConf().setMaster("local").setAppName("SecondarySort") val sc=new SparkContext(conf) val file= sc.textFile("./doc/secondarySort") val pairWithSortKey = file.map(line=>(new SecondarySortKey(line.split(" ")(0).toInt,line.split(" ")(1).toInt),line)) .sortByKey().map(_._2).saveAsTextFile("./doc/out1") } } class SecondarySortKey(val first:Int,val second:Int) extends Ordered[SecondarySortKey] with Serializable { override def compare(that: SecondarySortKey): Int = { if (this.first-that.first!=0){ this.first-that.first } else { this.second-that.second } } }

    4. spark中文件的读取

    (1) 读本地文件

    sc.textFile

    (2) 读Json

       ① 使用import scala.util.parsing.json.JSON包里的parseFull(),返回的是Option(map: Map[Any, Any]),然后使用模式匹配。

    object RDD02 { def main(args: Array[String]): Unit = { //从Json文件中读取 val conf = new SparkConf().setMaster("local").setAppName("jsonDemo") val sc = new SparkContext(conf) val file:RDD[String] = sc.textFile("E:\\.people.json") file.foreach(print) //解析 val result:RDD[Option[Any]] = file.map(line=>JSON.parseFull(line)) result.foreach({ r => r match { case Some(map:Map[String,Any]) => println(map) case None => println("解析错误") case _ => println("格式错误") } }) } }

       ② 使用的是

    import org.json4s.jackson.JsonMethods._ import org.json4s.ShortTypeHints import org.json4s.jackson.Serialization

    三个包(可以使用样例类进行模式匹配)

    case class Person(name:String,age:Int){ def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local").setAppName("JSON") val sc = new SparkContext(conf) //隐式转换 implicit val formats = Serialization.formats(ShortTypeHints(List())) val input = sc.textFile("E:\\.people.json") input.collect().foreach(x => {val c = parse(x).extract[Person] println(c.name+"->"+c.age) }) } }

       ③ json数据主要用sparkSql来解析更加方便

    (3) HBase中读取和写入

       ① 读取

    object HBaseInput { def main(args: Array[String]): Unit = { val conf : Configuration = HBaseConfiguration.create() conf.set("hbase.zookeeper.quorum","192.168.31.130") conf.set(TableInputFormat.INPUT_TABLE,"stu") val sparkConf : SparkConf = new SparkConf().setMaster("local").setAppName("hbaseSpark") val sc = new SparkContext(sparkConf) val stuRDD: RDD[(ImmutableBytesWritable,Result)] = sc.newAPIHadoopRDD(conf,classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result]) stuRDD val count = stuRDD.count() println("student RDD Count:"+count) stuRDD.foreach({ case (_,result: Result)=>{ val key = Bytes.toString(result.getRow) val name = Bytes.toString(result.getValue("info".getBytes(),"name".getBytes())) val gendar = Bytes.toString(result.getValue("info".getBytes(),"gender".getBytes())) val age = Bytes.toString(result.getValue("info".getBytes(),"age".getBytes())) println("rowKey:=="+key+" Name:=="+name+" gendar:=="+gendar+"age:=="+age) } }) } }

       ② 写入

    object HBaseOutput { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local").setAppName("hbaseSpark") val sc = new SparkContext(conf) val hbaseconf = HBaseConfiguration.create() hbaseconf.set("hbase.zookeeper.quorum","192.168.31.130") hbaseconf.set(TableOutputFormat.OUTPUT_TABLE,"stu") //sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE,"student") val job = new Job(hbaseconf) job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]]) job.setOutputKeyClass(classOf[ImmutableBytesWritable]) job.setOutputValueClass(classOf[Result]) val indataRDD = sc.makeRDD(Array("3,wangwu,M,26","4,maliu,F,27")) val rdd = indataRDD.map(_.split(",")).map(arr=>{ val put = new Put(Bytes.toBytes(arr(0))) put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("name"),Bytes.toBytes(arr(1))) put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("gendar"),Bytes.toBytes(arr(2))) put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("age"),Bytes.toBytes(arr(3))) (new ImmutableBytesWritable,put) }) rdd.saveAsNewAPIHadoopDataset(job.getConfiguration()) } }

    (4) 从传统数据库中读

    5. RDD键值对计算

    def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local").setAppName("Combine") val sc = new SparkContext(conf) val data = sc.parallelize(Array(("company-1",92),("company-1",85),("company-1",82), ("company-2",78),("company-2",96),("company-2",85), ("company-3",88),("company-3",94),("company-3",80))) val res = data.combineByKey( /** * ("company-1",92),("company-1",85),("company-1",82), * ("company-2",78),("company-2",96),("company-2",85), * ("company-3",88),("company-3",94),("company-3",80) */ (income)=>(income,1), /** * ("company-1",92,1),("company-1",85,1),("company-1",82,1), * ("company-2",78,1),("company-2",96,1),("company-2",85,1), * ("company-3",88,1),("company-3",94,1),("company-3",80,1) */ (acc:(Int,Int),income)=>{(acc._1+income,acc._2+1)}, /** * ("company-1",259,3) * ("company-2",259,3) * ("company-3",262,3) */ (acc1:(Int,Int),acc2:(Int,Int))=>(acc1._1+acc2._1,acc1._2+acc2._2) ).map{ case (key,value)=>(key,value._1,value._1/value._2.toFloat) } res.repartition(1).saveAsTextFile("E:\\result") // (company-2,259,86.333336) // (company-3,262,87.333336) // (company-1,259,86.333336) }


    --->有问题请联系QQ1436281495^_^
    最新回复(0)