我们知道只要遇到shuffle算子就会产生划分stage,如上划分出了俩个stage,这两个stage的并行度是可以直接代码指定控制的(调优),对于shuffle后的stage称之为ResultStage/FinalStatge,shuffle之前的stage称之为ShuffleMapStage,
对于HDFS一个数据块即是一个分区,即一个task。ResultStage若未指定Patition数,则默认和ShuffleMapStage最后RDD的分区数保持一致可将spark的代码调至1.6版本,查看SparkEnv.class文件。其shuffleManager实例化对象源码如下。
val shortShuffleMgrNames = Map( "hash" -> "org.apache.spark.shuffle.hash.HashShuffleManager", "sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager", "tungsten-sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager") val shuffleMgrName = conf.get("spark.shuffle.manager", "sort") val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName) val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)1.2之前默认使用的是HashShuffleManger、1.2之后默认使用SortHashManger,2.0之后HashShuffleManger被废除。
如上图,对于HashShuffleManager:
每一个MapTask(ShuffleMapStage的task)会生成N个ReduceTask的临时文件,是写入本地磁盘的,默认的配置路径:spark.local.dir=/tmp
ReduceTask回去磁盘拉取这些临时文件放到缓冲区然后进行Reduce计算,缓冲区最大为48M,默认配置:spark.reducer.maxMbInFlight=48,这个默认参数勉强凑合着
写每个文件时都有个内存缓冲Bucket,默认是到32kB一写,生产中肯定不行,(调优点)缓冲太小了,写的太平频繁了,配置文件是:spark.shuffle.file.buffer.kb=32,这个参数生产上至少要调大10倍
产生临时文件数:M*R,若生产上有1000M,1000R,则产生100万个文件,这是万万不可接受的,很容易导致程序崩溃,而且及其耗费性能,是spark的计算瓶颈所在。
缓冲的Bucket占用内存: C R32,若8Core的1000R的作业,则缓冲区占Executror就有 256M,这也是不可接受的。
HashShuffleManager缺点总结:
缺点一:Maptask生成的文件太多缺点二:单个Bucket太小频发访问磁盘针对这种情况设区推出了能将临时文件合并功能:spark.shuffle.consolidateFiles=true,默认是false,此时生成的文件数是 Core*R,但是当R很大时依旧会崩溃。
注意测试的代码不要使用2.x,我这里使用的是1.6,同时要设置ShuffleManager为HASH。
package com.wsk.spark import org.apache.spark.{SparkConf, SparkContext} /** * ShuffleManager * * 1)HashShufflemanager:1.2前默认。2.0版本废弃 * 2)SortShuffleManager: 1.2后默 */ object ShuffleManager { def main(args: Array[String]): Unit = { /** HashShufflemanager生成的临时文件数测试 * */ hashShuffleManagerFilesNumTest() } def hashShuffleManagerFilesNumTest(): Unit = { val conf = new SparkConf() .setMaster("local[2]") .setAppName("HashShuff App") .set("spark.local.dir", "/tmp") .set("spark.shuffle.manager","HASH") val sc = new SparkContext(conf) val rdd = sc.textFile("data/cdnlog/input/CDN_click.log", 10) rdd.flatMap(_.split(",")).coalesce(4).repartition(6).collect() Thread.sleep(2000000) sc.stop() } }