—>Spark知识点总结导航<—
RDD Resillient Distrabuted Dataset( 弹性分布式数据集),是分布式内存一种抽象概念,提供了一种高度受限的共享内存模型(不存数据)
spark数据集的表现形式
partition是RDD最小数据单元,多个partition组成一个RDD(1) 读入外部数据源进行创建(只读,不能修改)
(2) 最后一个RDD经过“动作”操作进行转换,并输出到外部数据源
操作:动作(Action)和转换(Transformation)优点:惰性调用、管道化、避免同步等待、不需要保存中间结果、每次操作变得简单(1) 高效的容错性
① 容错机制:数据复制或记录日志
② 血缘关系、重新计算丢失分区、无需回滚系统、在不同节点间在重算过程中、记录粗粒度操作
(2) 中间结果持久化到内存,数据在内存中的多个RDD操作之间进行传递,避免了不必要的读写磁盘开销
(3) 存放的都是Java对象,避免了不必要的序列化和反序列化
(1) 窄依赖
① 一个父RDD对应一个子RDD的分区
② 多个父RDD对应一个子RDD的分区
实现流水线优化,可以实现管道化
(2) 宽依赖(shuffle)
一个父RDD的一个分区对应一个子RDD的多个分区
包含shuffle过程(磁盘读写,效率低),无法实现流水线优化
(1) 本地文件
sc.textFile("/home/duck/software/spark/RELEASE")(2) HDFS
sc.textFile("hdfs://Cloud:9000/RELEASE")textFile转换算子,当我们执行上面语句的时候,没有真的去做读文件这个操作
(1) RDD里面每一个元素都是一个k-v对
(2) reduceByKey就是只针对键值对的RDD才能用的操作
1.RDD由一系列partition组成
2.算子(函数)是作用在partition上的
3.RDD之间有依赖关系
4.分区器是作用在K,V格式的RDD上
5.partition对外提供最佳的计算位置,利于数据处理的本地化
RDD中的元素是一个个的Tuple2(二元组)
底层调用的是MR读取HDFS的方法,首先也会split,一个split对应一个block,这里的split 也对应一个partition.
RDD中的partition是分布在多个节点上的
(1) partition的个数可多可少
(2) RDD之间有依赖关系
(3) RDD中是不存数据的
通过转换算子来对RDD进行操作,懒执行,需要Action算子(行动算子)触发执行
(1) map:把我们RDD中每个元素都执行map里的(function),形成一个新的RDD,原来的RDD中的元素,与新RDD中的元素一对一 RDD2=RDD1.map(x=>x+2) 等价于RDD2=RDD1.map(_+2)
(2) flatMap:把RDD中的每个元素都都执行flatMap里的(function),原来的RDD中的每个元素对应新的RDD中的一个或多个元素,那么原则上能用flatMap实现,用map+flatten都能实现
(3) filter:把RDD中的每个元素都执行filter中的每个function,把结果是true的留下,生成新的RDD,也就是filter的function返回类型必须是Boolean res.filter(_,contains(“hadoop”)) res.collect()
(4) reduceByKey:针对k-v对的RDD,把RDD中每个元素都按照相同key聚合,(聚合函数自己定义),每个key返回结果值。 reduceByKey((a,b)=>a-b)等价于reduceByKey(-)
(5) groupByKey:针对k-v的RDD才能使用的算子,把相同key的value聚合成一个valuelist(聚合形式不能自己定义)
groupByKey+map = reduceByKey (scala,CompactBuffer(1)) (spark,CompactBuffer(1, 1, 1)) (is,CompactBuffer(1, 1, 1)) (hadoop,CompactBuffer(1)) (fast,CompactBuffer(1)) (haoop,CompactBuffer(1)) (hello,CompactBuffer(1, 1, 1)) (good,CompactBuffer(1)) (better,CompactBuffer(1)) val unit1: RDD[(String, Iterable[Int])] = unit.map((_,1)).groupByKey() unit1.map(t=>(t._1,t._2.sum)).foreach(println)(6) join:针对k-v的RDD才能使用的算子,直返回两个RDD里面key相同的值,(key,(集合1key的value,集合2key的value))
leftOutjoin rightOutjoin fullOutjoin sortByKey sortBy mapValue keys values .......textFile转换算子,当我们执行上面的语句时并没有真的去读文件的操作,直到我们遇到一个行动算子的时候,我们才真的去读,所以在读文件时生成一个RDD,即使文件不存在也不会报错
Action算子触发Transformation类算子执行,一个application中有几个Action算子,就有几个job
(1) foreach() (2) count():长度(会将结果回收到Driver端) (3) first:第一个元素 (4) take(num)
(5) collect()(会将结果回收到Driver端) (6) saveAsTextFile
(7) reduce 把每个元素聚合,返回结果 … first = take(1)
(1) 持久化算子的特点
① 持久化算子不能直接在action算子之后,经过action算子以后,就不是RDD了
② 持久化算子是懒执行的
③ 取消持久化用unpersist()
(2) 持久化策略的原则是
尽量少用磁盘,尽量少用冗余备份的策略
(3) 持久化算子的类型
① cache()默认将数据存在内存中 =persist() =persist(StorageLevel.MEMORY_ONLY) cache()把RDD放在内存
② persist()可以手动的指定数据持久化的级别
persist(StorageLevel.MEMORY_ONLY) MEMORY_ONLY_SER MEMORY_AND_DISK MEMORY_AND_DISK_SER③ checkPoint()可以将数据持久化到磁盘,还可以切断RDD之间的依赖关系
a. checkPoint()落磁盘,而且进行checkPoint时,必须在sc里指定路径
b. checkPoint()优化,chekPoint()之前最好做一下cache()
c. 当lineage非常长,计算又复杂时,可以使用checkpoint对RDD进行持久化,当application执行完毕之后,checkpoint中的数据不会被清除
④ chekpoint的执行流程
a. 当application有action触发执行时,job执行完成之后,会从后往前回溯
b. 回溯去找有哪些RDD被checkpoint,被checkpoint的做标记
c. 回溯完成之后,重新计算checkpointRDD的数据,将结果写入指定的checkpoint目录中
d. 切断RDD的依赖关系
(4) 总结 a. persist()等价于cache()放内存,当数据大小超过内存大小时,超出部分不放入内存,当进行数据读取时先读内存里的,然后没有持久化的数据,去文件读取
b. cache和persist 的最小单位partition,都是懒执行,需要action算子触发执行
c. 对一个RDD使用cache或者persist后,可以赋值给一个变量,下次直接使用这个变量就是使用的持久化的数据
d. cache和persist之后不能紧跟action算子
e. 当application执行完毕之后,持久化的数据会被清除
① 使用import scala.util.parsing.json.JSON包里的parseFull(),返回的是Option(map: Map[Any, Any]),然后使用模式匹配。
object RDD02 { def main(args: Array[String]): Unit = { //从Json文件中读取 val conf = new SparkConf().setMaster("local").setAppName("jsonDemo") val sc = new SparkContext(conf) val file:RDD[String] = sc.textFile("E:\\.people.json") file.foreach(print) //解析 val result:RDD[Option[Any]] = file.map(line=>JSON.parseFull(line)) result.foreach({ r => r match { case Some(map:Map[String,Any]) => println(map) case None => println("解析错误") case _ => println("格式错误") } }) } }② 使用的是
import org.json4s.jackson.JsonMethods._ import org.json4s.ShortTypeHints import org.json4s.jackson.Serialization三个包(可以使用样例类进行模式匹配)
case class Person(name:String,age:Int){ def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local").setAppName("JSON") val sc = new SparkContext(conf) //隐式转换 implicit val formats = Serialization.formats(ShortTypeHints(List())) val input = sc.textFile("E:\\.people.json") input.collect().foreach(x => {val c = parse(x).extract[Person] println(c.name+"->"+c.age) }) } }③ json数据主要用sparkSql来解析更加方便
① 读取
object HBaseInput { def main(args: Array[String]): Unit = { val conf : Configuration = HBaseConfiguration.create() conf.set("hbase.zookeeper.quorum","192.168.31.130") conf.set(TableInputFormat.INPUT_TABLE,"stu") val sparkConf : SparkConf = new SparkConf().setMaster("local").setAppName("hbaseSpark") val sc = new SparkContext(sparkConf) val stuRDD: RDD[(ImmutableBytesWritable,Result)] = sc.newAPIHadoopRDD(conf,classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result]) stuRDD val count = stuRDD.count() println("student RDD Count:"+count) stuRDD.foreach({ case (_,result: Result)=>{ val key = Bytes.toString(result.getRow) val name = Bytes.toString(result.getValue("info".getBytes(),"name".getBytes())) val gendar = Bytes.toString(result.getValue("info".getBytes(),"gender".getBytes())) val age = Bytes.toString(result.getValue("info".getBytes(),"age".getBytes())) println("rowKey:=="+key+" Name:=="+name+" gendar:=="+gendar+"age:=="+age) } }) } }② 写入
object HBaseOutput { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local").setAppName("hbaseSpark") val sc = new SparkContext(conf) val hbaseconf = HBaseConfiguration.create() hbaseconf.set("hbase.zookeeper.quorum","192.168.31.130") hbaseconf.set(TableOutputFormat.OUTPUT_TABLE,"stu") //sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE,"student") val job = new Job(hbaseconf) job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]]) job.setOutputKeyClass(classOf[ImmutableBytesWritable]) job.setOutputValueClass(classOf[Result]) val indataRDD = sc.makeRDD(Array("3,wangwu,M,26","4,maliu,F,27")) val rdd = indataRDD.map(_.split(",")).map(arr=>{ val put = new Put(Bytes.toBytes(arr(0))) put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("name"),Bytes.toBytes(arr(1))) put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("gendar"),Bytes.toBytes(arr(2))) put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("age"),Bytes.toBytes(arr(3))) (new ImmutableBytesWritable,put) }) rdd.saveAsNewAPIHadoopDataset(job.getConfiguration()) } }