RDD里面记录的是描述信息(从哪里读数据,以后对数据如何进行计算),RDD的方法分为两类Transformation(lazy),Action(生成Task,并发送到Executor中执行) Scala存储的是真正要计算的数据,执行方法后立即返回结果。
1.一系列分区 2.每一个输入切片会有一个函数作用在上面。 3.RDD和RDD之间存在依赖关系(是父RDD调用什么方法,传入那些函数得到的) 4.(可选择)RDD中如果存储的是KV,shuffle时会有一个分区器,默认是hashPartitioner 5.(可选择)RDD如果是读取HDFS中的数据,会有一个最优位置。(可以根据最优位置将Task发送到特定的Executor去执行,从而减少不同机器将网络传输数据)
作用:根据key进行聚合,需要传入三个函数。 第一个函数: 第二个函数:同一个分区中进行局部操作 第三个函数:全局操作 注意:combineByKey是比较底层的方法,传函数时必须要指定参数的类型。 pairRDD.combineByKey(x => x, (m: Int, n: Int) => m + n, (a: Int, b: Int) => a + b)
第一个函数:将相同key的第一个value取出来进行操作,放入到ListBuffer集合中。 第二个函数:在每一个分区中,将key相同的其他元素加入进来到ListBuffer集合中。 第三个函数:key相同的数据发送到相同的分区,将多个ListBuffer合并成一个ListBuffer。 解析如下图:
val rdd7 = rdd6.combineByKey(x => ListBuffer(x), (m: ListBuffer[String], n: String) => m += n, (a: ListBuffer[String], b: ListBuffer[String]) => a ++= b, new HashPartitioner(2), true, null) 参数:true 表示参数在分区内进行局部聚合。 HashPartitioner需要手动导入 import org.apache.spark.HashPartitioner
操作数据
http://bigdata.edu360.cn/laozhang http://bigdata.edu360.cn/laozhang http://bigdata.edu360.cn/laozhao http://bigdata.edu360.cn/laozhao http://bigdata.edu360.cn/laozhao http://bigdata.edu360.cn/laozhao http://bigdata.edu360.cn/laozhao http://bigdata.edu360.cn/laoduan http://bigdata.edu360.cn/laoduan http://javaee.edu360.cn/xiaoxu http://javaee.edu360.cn/xiaoxu http://javaee.edu360.cn/laoyang http://javaee.edu360.cn/laoyang http://javaee.edu360.cn/laoyang http://bigdata.edu360.cn/laozhao http://bigdata.edu360.cn/laozhao http://bigdata.edu360.cn/laozhao http://bigdata.edu360.cn/laozhao http://bigdata.edu360.cn/laozhao http://bigdata.edu360.cn/laoduan http://bigdata.edu360.cn/laoduan http://javaee.edu360.cn/xiaoxu http://javaee.edu360.cn/xiaoxu http://javaee.edu360.cn/laoyang http://javaee.edu360.cn/laoyang http://javaee.edu360.cn/laoyang http://bigdata.edu360.cn/laozhao http://bigdata.edu360.cn/laozhao http://bigdata.edu360.cn/laozhao http://bigdata.edu360.cn/laozhao http://bigdata.edu360.cn/laozhao http://bigdata.edu360.cn/laoduan http://bigdata.edu360.cn/laoduan http://javaee.edu360.cn/xiaoxu http://javaee.edu360.cn/xiaoxu http://javaee.edu360.cn/laoyang http://javaee.edu360.cn/laoyang http://javaee.edu360.cn/laoyang http://php.edu360.cn/laoli http://php.edu360.cn/laoliu http://php.edu360.cn/laoli http://php.edu360.cn/laoli注意:如果一个分组中的数据量较小时,可以使用该方法;如果一个分组中的数据量很大,使用该方法可能会造成内存溢出。
package day3 import java.net.URL import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} //分别求每个学科中,前3名的老师 object FavTeacher { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("FavTeacher").setMaster("local[4]") val sc =new SparkContext(conf) //指定以后从哪里读取数据 val lines: RDD[String] = sc.textFile(args(0)) //整理数据 val sbjectTeacherAndOne: RDD[((String, String), Int)] = lines.map(line => { val index = line.lastIndexOf("/") val teacher = line.substring(index + 1) val httpHost = line.substring(0, index) val subject = new URL(httpHost).getHost.split("[.]")(0) //(subject, teacher) ((subject, teacher), 1) }) //和一组合在一起(不好,调用两次map方法) //val map: RDD[((String, String), Int)] = sbjectAndTeacher.map((_, 1)) //聚合,将学科和老师联合当做key val reduced: RDD[((String, String), Int)] = sbjectTeacherAndOne.reduceByKey(_+_) //分组排序,(按学科进行分组) //[学科,该学科对应的老师的数据] //使用迭代器不使用数组的原因:从不同的分区中拉取数据,数据量的大小不能确定,所以不能使用数组。 val grouped: RDD[(String, Iterable[((String, String), Int)])] = reduced.groupBy(_._1._1) //经过分组后,一个分区内可能有多少个学科的数据,一个学科就是一个迭代器 //将每一个组拿出来进行操作,为了方便排序 //为什么可以调用Scala上面的sortby方法呢?因为一个学科的数据已经在一台机器上的一个Scala集合里了。 //mapValues方法得到的是iterable,调用Scala语言的toList方法,将iterable转换成List。通过Scala提供的方法进行排序。 val sorted = grouped.mapValues(_.toList.sortBy(_._2).reverse.take(3)) //收集结果 val r: Array[(String, List[((String, String),Int)])] = sorted.collect() //打印 println(r.toBuffer) sc.stop() } }作用:通过过滤将每一个学科的数据提取出来,利用RDD存储优势(内存+磁盘)进行排序。
package day3 import java.net.URL import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} //适用于排序数据量比较大的情况 object FavTeacher2 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("FavTeacher").setMaster("local[4]") val sc =new SparkContext(conf) val subjects = Array("php", "javaee", "bigdata") //指定以后从哪里读取数据 val lines: RDD[String] = sc.textFile(args(0)) //整理数据 val sbjectTeacherAndOne: RDD[((String, String), Int)] = lines.map(line => { val index = line.lastIndexOf("/") val teacher = line.substring(index + 1) val httpHost = line.substring(0, index) val subject = new URL(httpHost).getHost.split("[.]")(0) ((subject, teacher), 1) }) //聚合,将学科和老师联合当做key val reduced: RDD[((String, String), Int)] = sbjectTeacherAndOne.reduceByKey(_+_) //scala的集合排序是在内存中进行的,但是内存有可能不够用 //可以调用RDD的sortBy方法,内存+磁盘进行排序(适用于排序数据量大的情况) for(sb <- subjects){ //该RDD中对应的数据仅有一个学科的数据 //将每一个学科的数据过滤出来 val filtered: RDD[((String, String), Int)] = reduced.filter(_._1._1 == sb) //现在调用的是RDD的sortBy方法(内存+磁盘) val favTeacher = filtered.sortBy(_._2, false).take(3) //take是一个Action方法,最终结果通过网络收集到driver端。 println(favTeacher.toBuffer) } sc.stop() } }分区器的作用:决定上游的数据到下游的那个分区中。 作用:将相同学科的数据shuffle到同一个分区中。
package day3 import java.net.URL import org.apache.spark.rdd.RDD import org.apache.spark.{Partitioner, SparkConf, SparkContext} import scala.collection.mutable //通过这种方法会产生两次shuffle object FavTeacher3 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("FavTeacher").setMaster("local[4]") val sc = new SparkContext(conf) val topN = 3 //指定以后从哪里读取数据 val lines: RDD[String] = sc.textFile(args(0)) //整理数据 val sbjectTeacherAndOne: RDD[((String, String), Int)] = lines.map(line => { val index = line.lastIndexOf("/") val teacher = line.substring(index + 1) val httpHost = line.substring(0, index) val subject = new URL(httpHost).getHost.split("[.]")(0) ((subject, teacher), 1) }) //聚合,将学科和老师联合当做key val reduced: RDD[((String, String), Int)] = sbjectTeacherAndOne.reduceByKey(_+_) //shuffle //计算有多少学科 //distinct方法也要shuffle,去重。 val subjects: Array[String] = reduced.map(_._1._1).distinct().collect() //自定义一个分区,并且按照指定的分区器进行分区 val sbPatitioner = new SubjectParitioner(subjects) //partitionBy按照分区规则进行分区 //调用partitionBy时,RDD的key是(String,String) val partitioned: RDD[((String, String), Int)] = reduced.partitionBy(sbPatitioner) //shuffle //如果一次拿出一个分区(可以操作一个分区中的数据了) //mapPartitions会一次拿到一个分区。 val sorted: RDD[((String,String), Int)] = partitioned.mapPartitions(it => { //将迭代器转换成list,然后排序,在转换成迭代器返回 it.toList.sortBy(_._2).reverse.take(topN).iterator }) val r: Array[((String, String), Int)] = sorted.collect() println(r.toBuffer) sc.stop() } } //自定义分区器 class SubjectParitioner(sbs: Array[String]) extends Partitioner{ //相当于主构造器(new的时候会执行一次) //用于存放规则的一个map val rules = new mutable.HashMap[String, Int]() var i = 0 for(sb <- sbs){ rules.put(sb, i) i += 1 } //返回分区的数量(下一个RDD有多少分区) override def numPartitions: Int = sbs.length //根据传入的Key计算分区编号 //key是一个元组(String, String) override def getPartition(key: Any): Int = { //获取学科名称 val subject = key.asInstanceOf[(String, String)]._1 //根据规则计算分区编号 rules(subject) } }shuffle数据的传输需要通过网络,减少shuffle能够有效降低网络传输压力。
package day3 import java.net.URL import org.apache.spark.rdd.RDD import org.apache.spark.{Partitioner, SparkConf, SparkContext} import scala.collection.mutable //能够减少一次shuffle object FavTeacher3 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("FavTeacher").setMaster("local[4]") val sc = new SparkContext(conf) val topN = 3 //指定以后从哪里读取数据 val lines: RDD[String] = sc.textFile(args(0)) //整理数据 val sbjectTeacherAndOne: RDD[((String, String), Int)] = lines.map(line => { val index = line.lastIndexOf("/") val teacher = line.substring(index + 1) val httpHost = line.substring(0, index) val subject = new URL(httpHost).getHost.split("[.]")(0) ((subject, teacher), 1) }) //计算有多少学科 val subjects: Array[String] = sbjectTeacherAndOne.map(_._1._1).distinct().collect() //自定义一个分区,并且按照指定的分区器进行分区 val sbPatitioner = new SubjectParitioner(subjects) //聚合,会shuffle一次,相比之前少了一次shuffle. //该RDD一个分区内仅有一个学科的数据 val reduced: RDD[((String, String), Int)] = sbjectTeacherAndOne.reduceByKey(sbPatitioner, _+_) //如果一次拿出一个分区(可以操作一个分区中的数据了) val sorted: RDD[((String,String), Int)] = reduced.mapPartitions(it => { //将迭代器转换成list,然后排序,在转换成迭代器返回 it.toList.sortBy(_._2).reverse.take(topN).iterator }) val r: Array[((String, String), Int)] = sorted.collect() println(r.toBuffer) sc.stop() } } //自定义分区器 class SubjectParitioner(sbs: Array[String]) extends Partitioner{ //相当于主构造器(new的时候会执行一次) //用于存放规则的一个map val rules = new mutable.HashMap[String, Int]() var i = 0 for(sb <- sbs){ rules.put(sb, i) i += 1 } //返回分区的数量(下一个RDD有多少分区) override def numPartitions: Int = sbs.length //根据传入的Key计算分区编号 //key是一个元组(String, String) override def getPartition(key: Any): Int = { //获取学科名称 val subject = key.asInstanceOf[(String, String)]._1 //根据规则计算分区编号 rules(subject) } }生成了两种Task,四个Task。 第一种ShuffleMapTask:读取数据,处理数据,将数据写入磁盘。 第二种ResultTask:通过网络拉去数据,对数据进行处理,将数据写入HDFS。 注意: 生成几种Task与shuffle有关,上图中一共有两种Task四个Task,生成了6个RDD,有2个Stage。
package cn.edu360.spark import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object ScalaWordCount { def main(args: Array[String]): Unit = { //创建spark配置,设置应用程序名字 val conf = new SparkConf().setAppName("ScalaWordCount").setMaster("local[*]") //创建spark执行的入口 val sc = new SparkContext(conf) //指定以后从哪里读取数据创建RDD(弹性分布式数据集) //sc.textFile(args(0)).flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_) val lines: RDD[String] = sc.textFile(args(0)) //切分压平 val words: RDD[String] = lines.flatMap(_.split(" ")) //将单词和一组合 val wordAndOne: RDD[(String, Int)] = words.map((_, 1)) //按key进行聚合 val reduced:RDD[(String, Int)] = wordAndOne.reduceByKey(_+_) //排序 val sorted: RDD[(String, Int)] = reduced.sortBy(_._2, false) //将结果保存到HDFS中 sorted.saveAsTextFile("hdfs://node1:9000/spark_countword") lines.count() //释放资源 sc.stop() } }