数据序列化,对于任意分布式系统都是性能的关键点
Spark默认使用Java serialization,这个比较低效
推荐使用,Kryo serialization,会比Java序列化,更快更小, Spark使用Twitter chill library(Kryo的scala扩展)
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.kryoserializer.buffer.mb“, 2), 需要大于最大的需要序列化的对象size
之所以,spark不默认使用Kryo,因为Kryo需要显式的注册program中使用到的class,参考
val conf = new SparkConf().setMaster(...).setAppName(...) conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2])) val sc = new SparkContext(conf)只所以要做注册是因为,在把对象序列化成byte[]时,要记录下classname,classname带namespace一般很长的,所以每个里面加上这个classname比较费空间 在kryo里面注册过后,会用一个int来替代classname 当然不注册kryo也是可以用的,只是会多占空间
Tuning之前需要知道当前dataset的内存消耗是多少, 简单的方法是,以该dataset创建rdd,然后cache 这样从SparkContext的日志里面可以看到每个partition的大小,加一下,就可以得到整个数据集的大小
INFO BlockManagerMasterActor: Added rdd_0_1 in memory on mbk.local:50311 (size: 717.5 KB, free: 332.3 MB) This means that partition 1 of RDD 0 consumed 717.5 KB. 然后可以从几个方面去进行优化,首先需要打开gc日志, adding -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps to the Java options
Cache Size Tuning 默认Spark使用60% 的executor memory(spark.executor.memory)来cache RDDs. 也就是说只有40%的memory用于task执行,如果发现频繁gc或是oom,可以调低用于cache的比例, conf.set("spark.storage.memoryFraction", "0.5"), 这样设成50% Advanced GC Tuning Spark做gc tuning的目标是,避免在task执行过程中发生full gc, 即需要让Young区足够容纳short-lived objects a, 如果发生多次full gc或是OldGen已经接近full,说明内存不够,可以降低cache比例 b, 如果很多minor gc,但没有major gc,说明young区过小, 我们可以根据task dataset需要消耗内存来预估eden区,young区大小= eden区 × (4/3),因为要加上survivor区 c, 如果从hdfs读取数据,可以根据hdfs block大小来预估eden区大小,比如,如果解压比例3倍,4个tasks并行,block大小64M,那么eden区大小 = 3×4×64M其他的一些考虑,
调整并发的level, 通过增加并发来降低reduce task的内存消耗
用broadcast functionality来处理大的变量, data locality
本文章摘自博客园,原文发布日期:2015-04-21