RDD操作

    xiaoxiao2024-10-19  80

    文章目录

    从宏观角度看RDD OperationsRDD map算子详解RDD filter结合map算子详解RDD mapValues算子详解RDD常用action算子详解 看官网

    http://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-operations

    从宏观角度看RDD Operations

    官网写的很清楚: RDD支持两种类型的操作:

    transformations:转换操作 从一个已经存在的数据集里来创建一个新的数据集。 体现了:RDDA =>RDDB RDD是不可变的,从RDDA产生了RDDB,这两个RDD不是一个东西。RDDB依赖于RDDA。actions 一个操作 它在一个数据集之上运行一个计算之后,返回一个值到我们的driver program(可以理解为一个客户端,比如启动的spark-shell)cache (这个也可以当做一个算子) 上面算是三大类 比如map是一个转换操作,它通过一个函数传递每个数据集元素,就是把这个函数作用到数据集里每一个元素之上,然后返回一个新的RDD,这个新的RDD代表这个结果集。 reduce是一个操作,它使用一些函数把数据集里所有的元素进行聚合,并返回最终一个结果到我们的客户端。

    Spark 里面所有的transformations 都是lazy的。lazy可以理解为用的时候才去加载。它并不会立刻去计算它们的结果。它们仅仅是记住作用在一个数据集比如文件上面的关系。比如RDDA.map(…).filter(…),这个,它仅仅是记住了有个map作用在了RDDA上面,有一个filter作用在了RDDA.map(…)的结果这个之上。它记住了这些转换关系。 只有当一个action需要返回一个结果到客户端上,就是当触发一个action的时候,这个transformations 才会被计算。这种设计使得spark运行的更加高效。 为什么会更高效?比如,一个数据集通过一个map被创建,这个map会在一个reduce里被使用。仅仅返回reduce的结果到客户端,而不是更大的map的数据集。比如RDDA.map(…).reduce(…),这个,它只返回一个reduce的结果,而不是像MapReduce那样,map之后还要输出到本地。(还有一个点是lineage,后面再学)。

    每个transformed RDD在运行一个action的时候有可能会被重新计算。比如你 某一个RDD的前面一个RDD的某个partition数据丢了或者挂了,它会通过依赖通过血缘关系去找到丢失的那个RDD的上一层父层的RDD的那个partition拿出来重新计算。另外,你也可能会用persist或者 cache方法 把一个RDD保存到内存中,这种情况下,你后面如果再次访问这些数据,会更快的去访问。所以如果你将来还会再用到这些数据,你可以把它cache住。当然,它也支持把RDD保存在磁盘或者以多副本的方式存储在多个节点之上。

    RDD map算子详解

    常用的Transformations: 下面来看一下map(func) Return a new distributed dataset formed by passing each element of the source through a function func. 通过一个函数,作用于数据源里的每个元素,返回一个新的分布式数据集。 map:对RDD中的每个元素都执行一个函数。 举例说明:

    scala> val a = sc.parallelize(1 to 9) a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24 //a是一个RDD,它是org.apache.spark.rdd下面的RDD,这个RDD里面装的类型是Int类型(自动推导的),是一个并行化集合的方式 scala> a.map(x => x * 2) res1: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at <console>:26 //并没有把a.map(x => x * 2)这个结果赋给一个变量,系统会自动赋给res1 scala> val b = a.map(x => x * 2) b: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[2] at map at <console>:25 //对a这个RDD里的每个元素进行乘以2的操作,并把结果赋值给另一个RDD b scala> b.collect res2: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18) //把b这个RDD里的元素 以数组的形式 返回给控制台(spark-shell)

    再举例:

    scala> val a = sc.parallelize(List("zhangsan","lisi","wangwu","zhaoliu","sunqi")) a: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[4] at parallelize at <console>:24 //a这个RDD里的元素类型是String类型 scala> val b = a.map(x => (x,1)) b: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[5] at map at <console>:25 //给a里面的每个元素变成(x,1),b这个RDD里的元素类型是(String, Int)类型 ,它是一个tuple元组 scala> b.collect res3: Array[(String, Int)] = Array((zhangsan,1), (lisi,1), (wangwu,1), (zhaoliu,1), (sunqi,1)) //返回数组,元素类型是(String, Int) //对于map,比如 val b = a.map(x => (x,1))这个代码,你去使用RDD的api进行编程和你去使用scala进行编程,是一模一样的

    RDD filter结合map算子详解

    filter(func) : Return a new dataset formed by selecting those elements of the source on which func returns true. filter:对元素进行过滤,把需要的给过滤下来。 举例:

    scala> val a = sc.parallelize(1 to 10) a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at <console>:24 scala> a.filter(x => x%2 == 0).collect res9: Array[Int] = Array(2, 4, 6, 8, 10) scala> a.filter(_%2 == 0).collect res10: Array[Int] = Array(2, 4, 6, 8, 10) scala> a.filter(_ < 4).collect res12: Array[Int] = Array(1, 2, 3) scala> val a = sc.parallelize(1 to 6) a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[13] at parallelize at <console>:24 scala> val mapRdd = a.map(_ * 2) mapRdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[14] at map at <console>:25 scala> mapRdd.collect res13: Array[Int] = Array(2, 4, 6, 8, 10, 12) scala> val filterRdd = mapRdd.filter(_ > 5) filterRdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[15] at filter at <console>:25 scala> filterRdd.collect res14: Array[Int] = Array(6, 8, 10, 12)

    a.filter(x => x%2 == 0) a.filter(_%2 == 0) 这两种写法是两种形式,是一样的,下面是上面的简写。 开发过程中最好采用链式编程。 比如:

    scala> sc.parallelize(1 to 10).map(_ * 2).filter(_ > 6).collect res15: Array[Int] = Array(8, 10, 12, 14, 16, 18, 20)

    RDD mapValues算子详解

    mapValues这个api只对(key,value)中的value做操作,不对key做操作。 举例:

    scala> val a = sc.parallelize(List("zhangsan","lisi","wangwu","zhaoliu","sunqi")) a: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[12] at parallelize at <console>:24 scala> val b = a.map(x =>(x.length,x)) b: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[13] at map at <console>:25 scala> b.collect res8: Array[(Int, String)] = Array((8,zhangsan), (4,lisi), (6,wangwu), (7,zhaoliu), (5,sunqi)) scala> b.mapValues("I love " + _ + " very much").collect //_表示里面的每个元素 res10: Array[(Int, String)] = Array((8,I love zhangsan very much), (4,I love lisi very much), (6,I love wangwu very much), (7,I love zhaoliu very much), (5,I love sunqi very much))

    RDD常用action算子详解

    reduce(func)这个api传进来两个参数,返回一个参数,进行的是两两操作,前面两个做完,得到一个结果,这个结果再与第三个进行操作…

    scala> val a = sc.parallelize(List("zhangsan","lisi","wangwu","zhaoliu","sunqi")) a: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[12] at parallelize at <console>:24 scala> a.count res11: Long = 5 scala> a.first res24: String = zhangsan scala> a.take(1) res25: Array[String] = Array(zhangsan) //取前2 从大到小逆向取 scala> a.top(2) res28: Array[String] = Array(zhaoliu, zhangsan) scala> val a = sc.parallelize(1 to 100) a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[16] at parallelize at <console>:24 scala> a.sum res12: Double = 5050.0 scala> a.sum.toInt res13: Int = 5050 scala> a.reduce(_ + _) res14: Int = 5050 scala> a.reduce((x,y) => (x+y)) res15: Int = 5050 //top 取前三个(先做了排序) scala> sc.parallelize(Array(2,8,5,12,6,3,5)).top(3) res27: Array[Int] = Array(12, 8, 6) //如果想顺序翻转过来,从小到大,取前三个,需要隐式转换一下 //自定义排序的扩展 //隐式转换写太多 可能会乱 也会冲突 scala> implicit val myOrder = implicitly[Ordering[Int]].reverse myOrder: scala.math.Ordering[Int] = scala.math.Ordering$$anon$4@9ddb540 scala> sc.parallelize(Array(2,8,5,12,6,3,5)).top(3) res29: Array[Int] = Array(2, 3, 5) scala> val a = sc.parallelize(Array(2,8,5,12,6,3,5)) a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[28] at parallelize at <console>:26 scala> a.max res31: Int = 2 //这个不对,是因为上面做了隐式转换,所以需要把spark-shell关掉打开一下 //所以隐式转换不要随意使用 //重新打开spark-shell后 scala> val a = sc.parallelize(Array(2,8,5,12,6,3,5)) a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24 scala> a.max res0: Int = 12 scala> a.min res1: Int = 2

    takeSample(boolean, sampleNum,seed):该函数是抽取随机数。 takeOrdered

    面试题:flatMap和map的区别? map操作,按照Spark里面的说就是,将一个RDD中的每一个元素都执行一个指定的函数,产生一个新的RDD,两个RDD中的元素一 一对应,除此之外,生成新的RDD与原来的RDD分区个数一样。 map(x => x2):x => x2 是一个函数,x是传入参数即RDD的每个元素,x*2是返回值 map(x => (x,1)): 该函数将RDD的每个元素(例如:a,b,c…)变为:(a,1),(b,1),(c,1)…键值对的形式 map(x => x.split("\s+")): 该函数以行为分割,将每行变为一个array。 比如:

    scala> val a = sc.parallelize(1 to 6) a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24 scala> a.map(x => (x,"one")).collect //每个元素后加上一个字符串“one”,变成键值对。 res0: Array[(Int, String)] = Array((1,one), (2,one), (3,one), (4,one), (5,one), (6,one)) //现在HDFS上有个文件是这样的 [hadoop@hadoop001 data]$ hdfs dfs -cat /data/wordcount.txt world world hello China hello people person love scala> val b = sc.textFile("hdfs://hadoop001:9000/data/wordcount.txt") b: org.apache.spark.rdd.RDD[String] = hdfs://hadoop001:9000/data/wordcount.txt MapPartitionsRDD[3] at textFile at <console>:24 scala> b.collect res1: Array[String] = Array(world world hello, China hello, people person, love) scala> b.map(x =>x.split("\\s+")).collect res3: Array[Array[String]] = Array(Array(world, world, hello), Array(China, hello), Array(people, person), Array(love))

    flatMap与map有些类似,区别是map将原RDD中的每个元素处理后只生成一个新的元素,而flatMap压压扁,可以生成多个元素。

    scala> b.flatMap(x =>x.split("\\s+")).collect res6: Array[String] = Array(world, world, hello, China, hello, people, person, love) scala> b.flatMap(x =>x.split("\\s+")).distinct.collect res7: Array[String] = Array(love, hello, world, people, China, person)
    最新回复(0)