Spark

    xiaoxiao2023-10-21  154

    文章目录

    RDD简介RDD常用的算子通过并行化scala集合创建RDDunion求并集intersection求交集join(连接)groupByKeycogroupcartesian笛卡尔积WordCount RDD中常见的Action方法collectreducecounttop排序取前两个take从头开始取前两个first(similer to take(1))从头开始取第一个takeOrdered取前三个 RDD分区的数量取决于那些因素 RDD中复杂的算子mapPartitionsWithIndexaggregate(初始值)(运算逻辑)AggregateByKeycollectAsMapcountByKeyfilterByRangeflatMapValuesfoldByKeyforeachPartitionforeach RDD分类Transformation包括Action包括

    RDD简介

    使用spark-submit客户端实例: 注意:如果执行的操作是TransForMation类型,即便有错误也可以执行(这里的错误指的是非语法错误),等到执行Action操作时,才会抛出异常。

    Internally, each RDD is characterized by five main properties:(RDD特征) A list of partitions A function for computing each split A list of dependencies on other RDDs Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned) Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)

    1.RDD是一个基本的抽象,操作RDD就像操作一个本地集合一样,降低了编程的复杂度。 RDD的方法分为两类:Transformation(懒 lazy), Action RDD不存在真正要计算的数据,而是记录了RDD的转换关系(调用了什么方法,传输什么函数)

    创建RDD有那些方式呢? 1》通过外部的存储系统创建RDD 2》将Driver的Scala集合通过并行化的方式编程RDD(测试,实验),集合中存放的数据是有限的,生产环境没有意义。如下图 3》调用一个已经存在了的RDD的Transformation,会产生一个新的RDD RDD的Transformation的特点: 1》lazy 2》生成新的RDD

    RDD常用的算子

    通过并行化scala集合创建RDD

    val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8)) val rdd2 = sc.parallelize(List(5,6,4,7,3,8,2,9,1,10)).map(*2).sortBy(x=>x,true) val rdd2 = sc.parallelize(List(5,6,4,7,3,8,2,9,1,10)).map(*2).sortBy(x=>x+"",true) 注意:sortBy()方法中只是传递的排序规则,并不会改变新生成RDD的数据类型。 val rdd4 = sc.parallelize(Array(“a b c”, “d e f”, “h i j”)) rdd4.flatMap(_.split(’ ')).collect

    val rdd5 = sc.parallelize(List(List(“a b c”, “a b b”),List(“e f g”, “a f g”), List(“h i j”, “a a b”))) 注意:第一个flatMap方法是RDD的方法,第二个flatMap方法是List的方法。

    union求并集

    注意类型要一致 val rdd6 = sc.parallelize(List(5,6,4,7)) val rdd7 = sc.parallelize(List(1,2,3,4)) val rdd8 = rdd6.union(rdd7) rdd8.distinct.sortBy(x=>x).collect 执行结果:res9: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7)

    intersection求交集

    val rdd9 = rdd6.intersection(rdd7)

    join(连接)

    val rdd1 = sc.parallelize(List((“tom”, 1), (“jerry”, 2), (“kitty”, 3))) val rdd2 = sc.parallelize(List((“jerry”, 9), (“tom”, 8), (“shuke”, 7), (“tom”, 2))) val rdd3 = rdd1.join(rdd2) res0: Array[(String, (Int, Int))] = Array((tom,(1,8)), (tom,(1,2)), (jerry,(2,9))) #左外连接 val rdd3 = rdd1.leftOuterJoin(rdd2) res0: Array[(String, (Int, Option[Int]))] = Array((tom,(1,Some(2))), (tom,(1,Some(8))), (kitty,(3,None)), (jerry,(2,Some(9)))) #右外连接 val rdd3 = rdd1.rightOuterJoin(rdd2) res1: Array[(String, (Option[Int], Int))] = Array((shuke,(None,7)), (tom,(Some(1),8)), (tom,(Some(1),2)), (jerry,(Some(2),9)))

    groupByKey

    val rdd3 = rdd1 union rdd2 rdd3.groupByKey res4: Array[(String, Iterable[Int])] = Array((shuke,CompactBuffer(7)), (tom,CompactBuffer(1, 8, 2)), (kitty,CompactBuffer(3)), (jerry,CompactBuffer(2, 9)))

    rdd3.groupByKey.map(x=>(x._1,x.2.sum)) rdd3.groupByKey.mapValues(.sum).collect

    cogroup

    val rdd1 = sc.parallelize(List((“tom”, 1), (“tom”, 2), (“jerry”, 3), (“kitty”, 2))) val rdd2 = sc.parallelize(List((“jerry”, 2), (“tom”, 1), (“shuke”, 2))) val rdd3 = rdd1.cogroup(rdd2) val rdd4 = rdd3.map(t=>(t._1, t._2._1.sum + t._2._2.sum))

    cartesian笛卡尔积

    val rdd1 = sc.parallelize(List(“tom”, “jerry”)) val rdd2 = sc.parallelize(List(“tom”, “kitty”, “shuke”)) val rdd3 = rdd1.cartesian(rdd2)

    WordCount

    sc.textFile("/root/words.txt").flatMap(x=>x.split(" ")).map((,1)).reduceByKey(+).sortBy(.2,false).collect sc.textFile("/root/words.txt").flatMap(x=>x.split(" ")).map((,1)).groupByKey.map(t=>(t._1, t._2.sum)).collect

    RDD中常见的Action方法

    val rdd1 = sc.parallelize(List(1,2,3,4,5), 2)

    collect

    rdd1.collect

    reduce

    val r = rdd1.reduce(+)

    count

    rdd1.count

    top排序取前两个

    rdd1.top(2)

    take从头开始取前两个

    rdd1.take(2)

    first(similer to take(1))从头开始取第一个

    rdd1.first

    takeOrdered取前三个

    rdd1.takeOrdered(3)

    RDD分区的数量取决于那些因素

    1.如果是将Driver端的Scala集合并行化创建RDD,并没有指定RDD的分区,RDD的分区是为该app分配的中的核数。 2.从hdfs中读取数据创建RDD,并且设置允许最小分区数量是1,RDD的分区数量为输入切片的数量(一个数据源文件生成一个分区文件)。不设置最小允许分区的数量,即spark调用textFile时会默认传入2(一个数据源文件生成2个分区文件)。 3.需要读取的文件数量较多,输入切片的数量等同于生成新分区的数量。 4.理想文件大小=数据源总大小/最小分区数量;第n个数据源文件大小/理想文件大小>1.1;将为第n个数据源文件分配2个输入切片;新的分区文件数量>数据源文件数量。

    RDD中复杂的算子

    RDD上面有3个分区,每个分区中记录的是读取那部分数据,SparkSubmit会为每个分区生成对应一个Task,把Task发送到Executor,在Executor上面执行,Executor会一次拿出来一个分区。

    mapPartitionsWithIndex

    transformation 一次拿出来一个分区,(分区中并没有数据,而是记录要读取那些数据,真正生成的Task会读取多条数据),并且可以将分区的编号取出来。 功能:取分区中对应的数据时,还可以将分区的编号取出来,这样就可以知道哪一个分区中有那些数据。 val func = (index: Int, it: Iterator[Int]) => { it.map(e => s"part: $index, ele: $e") }

    aggregate(初始值)(运算逻辑)

    action val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 2) 第一个函数在分区内做局部聚合,第二个函数将不用分区的值进行叠加。 每个分区中计算的时候会运用一下初始值,最后聚合的时候也会运用初始值。 rdd1.aggregate(5)(math.max(_, _), _ + _) 5=math.max(1,2,3,4,5) 9=math.max(5,5,6,7,8,9) 分区内 19 = 5(初始值) + 5(max) + 9(max) 聚合

    val rdd2 = sc.parallelize(List(“a”,“b”,“c”,“d”,“e”,“f”),2) 出现上面这种情况的原因是:每一个Task都是并行计算的,那个先返回不能确定。 val rdd3 = sc.parallelize(List(“12”,“23”,“345”,“4567”),2)

    val rdd4 = sc.parallelize(List(“12”,“23”,“345”,""),2) rdd4.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x + y) 第一个分区:math.min("".length, “12”.length).toString => “0” math.min(“0”.length, “23”.length).toString => “1” 第一分区结果为:“1” 第二个分区:math.min("".length, “345”.length).toString => “0” math.min(“0”.length. “”.length).toString => “0” 第二分区结果为:“0” 所以结果有两种情况:“10"或"01”

    val rdd5 = sc.parallelize(List(“12”,“23”,"",“345”),2) rdd5.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x + y)

    AggregateByKey

    transformation val pairRDD = sc.parallelize(List( (“cat”,2), (“cat”, 5), (“mouse”, 4),(“cat”, 12), (“dog”, 12), (“mouse”, 2)), 2) pairRDD.aggregateByKey(0)(math.max(_, _), _ + ).collect 第一个+是将同一分区,相同key的value向加。第二个+_是将不同分区内的,相同key的value相加。 注意:AggregateByKey方法的初始值只会在第一次相加的时候应用一次。

    collectAsMap

    作用:将结果以Map的形式收集到客户端。 val rdd = sc.parallelize(List((“a”, 1), (“b”, 2))) rdd.collectAsMap

    countByKey

    val rdd1 = sc.parallelize(List((“a”, 1), (“b”, 2), (“b”, 2), (“c”, 2), (“c”, 1))) rdd1.countByKey rdd1.countByValue

    filterByRange

    作用:过滤指定范围内的数据,前后都是闭区间。 val rdd1 = sc.parallelize(List((“e”, 5), (“c”, 3), (“d”, 4), (“c”, 2), (“a”, 1))) val rdd2 = rdd1.filterByRange(“b”, “d”) rdd2.collect

    flatMapValues

    val rdd3 = sc.parallelize(List((“a”, “1 2”), (“b”, “3 4”))) rdd3.flatMapValues(_.split(" "))

    foldByKey

    作用:将key相同的value进行聚合。 val rdd1 = sc.parallelize(List(“dog”, “wolf”, “cat”, “bear”), 2) val rdd2 = rdd1.map(x => (x.length, x)) val rdd3 = rdd2.foldByKey("")(+) 等同于:rdd2.reduceByKey(+).collect 等同于:rdd2.aggregateByKey(+).collect 聚合操作底层方法是:combineByKeyWithClassTag

    foreachPartition

    action 每次打印一个分区的数据 val rdd1 = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9), 3) rdd1.foreachPartition(x => println(x.reduce(_ + _))) rdd1.foreachPartition(it => it.foreach(x => println(x * 10000))) 数据会打印到Executor的日志文件中。

    foreach

    每次打印一条数据,在日志文件中。

    RDD分类

    Transformation包括

    aggregateByKey reduceByKey filter flatMap map mapPartition mapPartitionWithIndex

    Action包括

    collect aggregate saveAsTextFile foreach foreachPartition

    A list of partitions (一系列分区,分区有编号,有顺序的) A function for computing each split (每一个切片都会有一个函数作业在上面用于对数据进行处理) A list of dependencies on other RDDs (RDD和RDD之间存在依赖关系) Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned) (可选,key value类型的RDD才有RDD[(K,V)])如果是kv类型的RDD,会一个分区器,默认是hash-partitioned Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file) (可以,如果是从HDFS中读取数据,会得到数据的最优位置(向Namenode请求元数据))
    最新回复(0)