一、什么是RDD
二、RDD的源码定义
三、RDD五大特性详解
四、RDD五大特性和源码的对应关系
五、图解RDD
视频来源:若泽数据 链接:https://pan.baidu.com/s/1yWomjC3SAWDT8eh3ZX73pA 提取码:papt 楼主QQ:2032677340RDD:Resilent Distributed Dataset,弹性分布式数据集,是Spark中最基本的数据抽象(the basic abstraction in spark)。 作用:让开发者大大降低开发分布式应用程序的门槛以及提高执行效率。
RDD源码查看地址:
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scalaResilent Distributed Dataset:
1、Distributed Dataset 分布式数据集,数据归属在大的机器上,以分区的形式 2、Resilent(弹性怎么理解呢)弹性体现在它的计算之上。 弹性指的是RDD可以做到在分布式计算中容错, 比如:某个节点挂了,某个部分的数据丢失了, 能够通过某些机制进行修复,这个是体现在它计算中的Resilent(弹性)。 Distributed也意味着数据可以存储在不同的节点之上。 Dataset(数据集):可以理解为HDFS中的block、一个文件就是一个Dataset、一个数据结构就是一个数组。Represents an immutable, 代表了一个不可变得==> val修饰 意味着我们的RDD一旦产生就是不可变得,RDDA==>RDDB;RDDB必然是一个新的。
partitioned collection of elements RDD中的元素能够被分区,理解为HDFS中的block、MapReduce中的split
that can be operated on in parallel. RDD中的元素能够以一种并行的方式被操作。 假设RDDA当中的元素有三个分区:一个RDD元素被拆分成了几块,每一块都在不同节点上存储 Hadoop001:Partition1、(1,2,3) Hadoop002:Partition2、(4,5,6) Hadoop003:Partition3、(7,8,9) 假设我们对这个RDDA执行了一个“+1”的操作,operated +1,三个分区分别存放在不同的机器上,operated + 1:是同时对这三个分区进行操作的。 输出: Hadoop001:Partition1、(2,3,4) Hadoop002:Partition2、(5,6,7) Hadoop003:Partition3、(8,9,10)
解释:
抽象类 RDD不能直接new,必然是有子类实现的,我们使用时直接用其子类即可;如下是JdbcRDD,都是继承自RDD类 class JdbcRDD[T: ClassTag]( sc: SparkContext, getConnection: () => Connection, sql: String, lowerBound: Long, upperBound: Long, numPartitions: Int, mapRow: (ResultSet) => T = JdbcRDD.resultSetToObjectArray _) extends RDD[T](sc, Nil) with Logging { 序列化Serializable 网络的传输logging 记录日志的,在spark1.6中是能直接使用的,在spark2.0以后被移除了。T 泛型,支持字符串,自定义类型sparkContext@transientInternally, each RDD is characterized by five main properties:
1、A list of partitions
RDD是由一系列的分区所构成的2、A function for computing each split
举例:+ 1操作就是对所有分区都进行的3、A list of dependencies on other RDDs
RDDA --> RDDB --> RDDC --> RDDD,每一个分区间都有依赖关系4、Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
5、Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)
相当于是一个数据本地性的概念:你的数据在哪,就把task放到哪去执行,这样就省去了网络传输。源码网址:
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala def compute(split: Partition, context: TaskContext): Iterator[T] ==>对应RDD的第二个特性,做计算就是对RDD中的分区做计算。计算方法传入参数肯定有Partition,后面是上下文;Q:coumpute传入的参数有哪些
protected def getPartitions: Array[Partition] 一个RDD是由一系列的分区构成,那他返回的必然是一个数组或集合,集合中存放类型必然是partition ==>对应RDD中第一个特性 a list of partitions
protected def getDependencies: Seq[Dependency[_]] = deps //因为RDD中是有转换的,每一个都有对应关系。
protected def getPreferredLocations(split: Partition): Seq[String] = Nil 有一个方法叫getPreferredLocations,得到最优先的位置,返回的是一个数组或集合的东西,==>对应RDD第五个特点
@transient val partitioner: Option[Partitioner] = None //以键值对方式(key,value)的时候有一个partitioner, ==>对应第四大特点。
打开JdbcRDD.scala override def getPartitions: Array[Partition] = { // bounds are inclusive, hence the + 1 here and - 1 on end val length = BigInt(1) + upperBound - lowerBound (0 until numPartitions).map { i => val start = lowerBound + ((i * length) / numPartitions) val end = lowerBound + (((i + 1) * length) / numPartitions) - 1 new JdbcPartition(i, start.toLong, end.toLong) }.toArray } //获取到分区信息,操作JDBC的方式,代码中有一个起始和结束位置,eg:有10个分区,10000调数据,每个分区处理数据都有对应顺序,比如第一个分区处理第1~1000条数据,第二个分区处理1001~2000的数据。 每个分区做一个map,开始和结束构建出来一个JdbcParttion,就能得到partitions. 2、标准的Jdbc编程 override def compute(thePart: Partition, context: TaskContext): Iterator[T] = new NextIterator[T] { context.addTaskCompletionListener[Unit]{ context => closeIfNeeded() } val part = thePart.asInstanceOf[JdbcPartition] val conn = getConnection() val stmt = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY) val url = conn.getMetaData.getURL if (url.startsWith("jdbc:mysql:")) { // setFetchSize(Integer.MIN_VALUE) is a mysql driver specific way to force // streaming results, rather than pulling entire resultset into memory. // See the below URL // dev.mysql.com/doc/connector-j/5.1/en/connector-j-reference-implementation-notes.html stmt.setFetchSize(Integer.MIN_VALUE) } else { stmt.setFetchSize(100) } logInfo(s"statement fetch size set to: ${stmt.getFetchSize}") stmt.setLong(1, part.lower) stmt.setLong(2, part.upper) val rs = stmt.executeQuery() override def getNext(): T = { if (rs.next()) { mapRow(rs) } else { finished = true null.asInstanceOf[T] } } override def close() { try { if (null != rs) { rs.close() } } catch { case e: Exception => logWarning("Exception closing resultset", e) } try { if (null != stmt) { stmt.close() } } catch { case e: Exception => logWarning("Exception closing statement", e) } try { if (null != conn) { conn.close() } logInfo("closed connection") } catch { case e: Exception => logWarning("Exception closing connection", e) } } } } 3、HadoopRDD.scala(读取hadoop文件的) override def getPartitions: Array[Partition] = { val jobConf = getJobConf() // add the credentials here as this can be called before SparkContext initialized SparkHadoopUtil.get.addCredentials(jobConf) try { val allInputSplits = getInputFormat(jobConf).getSplits(jobConf, minPartitions) val inputSplits = if (ignoreEmptySplits) { allInputSplits.filter(_.getLength > 0) } else { allInputSplits } val array = new Array[Partition](inputSplits.size) for (i <- 0 until inputSplits.size) { array(i) = new HadoopPartition(id, i, inputSplits(i)) } array //读文件借助于inputformat,对于RDD来说,inputformat.getsplits,拿到一堆inputsplits,partition大小就是inputsplits大小,构建一个数组,每个数组传一个partition,数组大小就是inptsplits.size;迭代inputsplits,每个构建hadoopPartition。解释:如下图RDD有5个分区构成,每个分区中有5个元素;只有三台机器:hadoop001、hadoop002、hadoop003,分区分别存在于这三台机器上;数据既有可能存放在disk上也有可能存放在memory上,由于多副本策略,也有可能一个分区存多份
此时有5个分区,必然会启5个task,如果core够的话,5个task必然会是并行着执行的;core不够的话,会先跑完一轮再跑下一轮。 eg:partition的数据分布在各个节点之上,如果在hadoop002上运行partition1,需要先把partition1的数据复制到hadoop002上才能够做相应的计算,体现的是数据的本地性操作。
Resilent Distributed Dataset(弹性分布式数据集)
Internally,each RDD is characterized by five main properties: 1) A list of partitions 它由很多个分区构成 2) A function for computing each split 对rdd进行+1的操作就是对每个分区做一个相同的function 3)A list of dependencies on other RDDs 举例:RDDA=>RDDB=>RDDC=>RDDD RDD和RDD之间都是有依赖关系的 这是一个流水线式的依赖关系,当某个分区数据丢失,spark会对某一个分区进行计算,前提是窄依赖。 4)optionally, A Partitioner for key-value RDDs(e.g to say that the RDD is hash-partitioned) 类似于MapReduce中按照key-hash进行分发,kafka中数据倾斜的话会把null的值放在一个中去。 5)optionally, a list of preferred locations to compute each split on (eg. block locations on an HDFS file) 数据本地性,数据在哪,需要把task分配到他所在的数据位置 作业执行的时候是以task的方式运行的,作业是以task完成最慢时间来决定的,木桶原理。 任何一个存储必然需要分布式,所以数据存储是需要切分的,以多副本的方式存储便于容错,计算的时候也需要切分,默认是block大小,相当于一个task去处理两个blocksize。 hdfs+mapreduce==>spark1、def compute(split: Partition, context: TaskContext): Iterator[T] 2、protected def getPartitions: Array[Partition] 一个RDD是由若干个集合、分区构成,它返回类型必然是一个数组、集合。这个方法对应RDD的方法中是1。 3、protected def getDependencies: Seq[Dependency[_]] = deps 对应关系==>RDD中的第三个特性 4、protected def getPreferredLocations(split: Partition): Seq[String] = Nil 它要计算split和location, 5、Optionally overridden by subclasses to specify how they are partitioned(可选地由子类重写,以指定如何分区) @transient val partitioner: Option[Partitioner] = None key-value 键值对 再看一个hadoopRDD.scala方法: