spark调优经验总结

    xiaoxiao2022-07-04  136

    1.问题

    1)BAT这样的企业内部是如何开发和运行Spark程序的?

    · 开发和测试

    intellij进行开发,对scala支持的非常好,java也支持的很好,开发好了在local模式下运行进行测试,intellij是不能远程提交到集群的,itellij没有分发jar包功能。

    · spark生产环境运行

    将所有依赖的包打成assembly.jar包,这样不再依赖环境里的任何模式,一般用yarn cluster模式运行(driver可以容错),可以用yarn client模式进行测试。每天要跑,可以使用工作流调度器,按照你的设置定时定点跑程序,如果出错了可以报警,工作流调度引擎,可以处理复杂的依赖,用的比较多的开源

    airflow:轻量级,很多公司用

    oozie:比较复杂

    2)编译spark程序非常慢

    修改mavn pom仓库,将国外的仓库,改成国内的仓库,如阿里云

    3)Spark参数的设置问题

    有三种设置方式,  在程序内设置(优先级高),提交参数设置(优先级中),配置文件配置(优先级低),park-default:常用的可以在配置文件中配

    2.Spark调优初体验

    调优程序,首先得知道程序慢在哪里,要定位问题,找到优化的点。定位问题又涉及到很多层面,机器硬件、网络、操作系统、JVM虚拟机、大数据软件平台层、软件开发层,所以调优是一个综合工程,Spark程序调优可以分为以下几个层面:

    1)基础设施层

    机器硬件(如磁盘的选择,SATA盘还是SAS盘,磁盘RAID方式等)、网络(千兆网卡还是万兆网卡,网络峰值期间的带宽、吞吐、网络延迟、网络抖动,很多时候网络问题导致各种莫名问题,举个真实的例子,公司网线被老鼠咬了,导致网络时而可以,时而不行,鬼知道是什么问题,让人抓狂)、操作系统(操作系统的稳定性,内核版本的选择,非常重要,还有一些配置策略得和hadoop生态吻合)。这些都非常底层了,就网络、linux操作系统就够花时间学习和了解的了,很多时候需要系统集成部的同事一起配合。

    2)大数据平台层:HDFS、YARN、Hive、Spark、JVM等

    JVM的调优,还没有什么经验,而且目前也没有非常深入,最多也就是GC策略挑挑,大数据平台调优真是个技术活,需要很多经验。

    3)程序开发层

    开发工程师程序开发技巧、对业务的理解等

    2.1利用WebUI分析Spark程序瓶颈

    选择合适的资源,前提是了解集群有多少资源 。1)集群内存总量:单台节点共8G,2G给datanode,6G个spark做计算(yarn共有18G,3个节点);2)集群CPU总合数:  每个机器有8个CPU,共24个core;3)集群总存储:总存储空间有多少,一般保证20%左右的剩余空间,要不然会影响集群整体性能,HDFS剩余空间不足也会导致任务执行很慢,甚至失败。

    如何观察和评估程序的效率怎么执行shell,跑spark任务就不讲了,说明一下如何观察Spark任务执行的瓶颈,从哪几个指标观察任务执行的效率。

    指标一:观察Spark任务解析和提交消耗的时间

    主要涉及到的是jar包上传的优化,这里面分为2种情况:

    1)程序依赖的jar,这种通常是spark lib目录下的所有jar包,有好几百兆,spark程序会上传这些,为了提升效率,可以提前上传好,

    2)程序自身的jar包,如果程序不经常变动,也可以提前上传到HDFS上。

    指标二:观察WebUI产生的系统监控数据

     这里面有很多指标,罗列如下

    1)观察job监控参数,产生了多少个job,一个action对应一个job,如果action之间没有依赖关系,资源有富余,可以让job并行执行。

    2)观察stage监控参数,一个job分解成了结果stage,每个stage执行了多少时间,输入了多少数据量,shuffle read了多少数据,shuffle write 了多少数据。

    3)观察executor监控参数,driver在哪里,executor在哪里,每个executor启动了几个CPU核数,起了多少内存,输入多少数据(可以查看数据是否有倾斜),shuffle了多少数据,内存放了多少数据,GC执行了多少时间。还可以查看stderr,查看日志。

    4)观察每个task监控参数,task执行的时间,GC执行的时间,读入的数据总大小和记录数,shuffle的大小,task执行时的本地行,举个执行的任务例子,如下所示

    可以看出来,一共有2个stage,1个stage包含8个task,一个包含2个task,先跑8个的,再跑2个的,一个14秒,一个0.1秒。再看看executor,发现只有2个executor,一个executor只有1个core,也就是一个executor只能处理一个task,集群也就是最多跑2个,10个task要跑5轮

    再点击stage的链接进去,观察每个task跑多长时间

    好了,按照前面罗列的监控指标点,可以看出如下监控参数:

    集群资源:内存3个节点18G,VCORE:3个节点24VCORE

    1)Job监控参数:只有一个Job

    2)Stage监控参数:有2个Stage

    ·  stage1:8个task,这个先跑,执行了14秒

    ·  stage2:2个task,这个后跑,执行了0.1秒,是collect方法,数据汇聚到driver

    3)executor:2个executor,1个executor 1G内存

    4)task:本地性node_local,GC毫秒级,shuffle也是不足Kb

    分析消耗的资源:

    1)从stage的执行时间分析,stage1执行时间长,可以考虑优化stage1

    2)executor分析:只有2个executor,内存也只有1G,启动的都是默认参数,和集群资源相比,有很作资源没有利用起来,有点浪费,可以调整executor个数,增大并发度,或者如果数据量大,也有必要,增大内存。

    3)task:本地行还可以,node级别,gc也可以,shuffle也少

    总结可能优化的点:

    优化stage1,Job中的stage1有8个task,2个executor,需要跑4轮才能跑完第一轮的所有task,调整为8个executor(原来同时跑2个task,现在可以同时跑8个task了),那么只需要一轮就而已跑完所有的task。内存1G也够用了,因为输入数据就不足1G,如果输入数据很大的话,也可以增大内存。

     2.2 设置合适的资源

    设置合适的资源,要明确集群的资源总量,然后观察监控指标,检查是否充分使用了集群中的资源

    资源量关联度比较大的一个是内存一个是CPU的使用率,当然还有其他,但这两个指标是比较重点的,这两个指标对应spark程序主要是Executor的内存和core。所以计算资源的设置单位是Executor

    增加Executor个数:--num-executors 4

    增加每个Executor同时云心的task数目:--executor-cores 2

    除了Executor消耗资源,还有driver和shuffle等也都是消耗资源大户,我把几个常用的和资源使用相关的参数含义及参考值总结如下:

    num-executors

    参数说明:该参数用于设置Spark作业总共要用多少个Executor进程来执行。Driver在向YARN集群管理器申请资源时,YARN集群管理器会尽可能按照你的设置来在集群的各个工作节点上,启动相应数量的Executor进程。这个参数非常之重要,如果不设置的话,默认只会给你启动少量的Executor进程,此时你的Spark作业的运行速度是非常慢的。

    参数调优建议:每个Spark作业的运行一般设置50~100个左右的Executor进程比较合适,设置太少或太多的Executor进程都不好。设置的太少,无法充分利用集群资源;设置的太多的话,大部分队列可能无法给予充分的资源。

    executor-memory

    参数说明:该参数用于设置每个Executor进程的内存。Executor内存的大小,很多时候直接决定了Spark作业的性能,而且跟常见的JVM OOM异常,也有直接的关联。

    参数调优建议:每个Executor进程的内存设置4G~8G较为合适。但是这只是一个参考值,具体的设置还是得根据不同部门的资源队列来定。可以看看自己团队的资源队列的最大内存限制是多少,num-executors乘以executor-memory,就代表了你的Spark作业申请到的总内存量(也就是所有Executor进程的内存总和),这个量是不能超过队列的最大内存量的。此外,如果你是跟团队里其他人共享这个资源队列,那么申请的总内存量最好不要超过资源队列最大总内存的1/3~1/2,避免你自己的Spark作业占用了队列所有的资源,导致别的同学的作业无法运行。

    executor-cores

    参数说明:该参数用于设置每个Executor进程的CPU core数量。这个参数决定了每个Executor进程并行执行task线程的能力。因为每个CPU core同一时间只能执行一个task线程,因此每个Executor进程的CPU core数量越多,越能够快速地执行完分配给自己的所有task线程。

    参数调优建议:Executor的CPU core数量设置为2~4个较为合适。同样得根据不同部门的资源队列来定,可以看看自己的资源队列的最大CPU core限制是多少,再依据设置的Executor数量,来决定每个Executor进程可以分配到几个CPU core。同样建议,如果是跟他人共享这个队列,那么num-executors * executor-cores不要超过队列总CPU core的1/3~1/2左右比较合适,也是避免影响其他同学的作业运行。

    driver-memory

    参数说明:该参数用于设置Driver进程的内存。

    参数调优建议:Driver的内存通常来说不设置,或者设置1G左右应该就够了。唯一需要注意的一点是,如果需要使用collect算子将RDD的数据全部拉取到Driver上进行处理,那么必须确保Driver的内存足够大,否则会出现OOM内存溢出的问题。

    spark.default.parallelism

    参数说明:该参数用于设置每个stage的默认task数量。这个参数极为重要,如果不设置可能会直接影响你的Spark作业性能。

    参数调优建议:Spark作业的默认task数量为500~1000个较为合适。很多同学常犯的一个错误就是不去设置这个参数,那么此时就会导致Spark自己根据底层HDFS的block数量来设置task的数量,默认是一个HDFS block对应一个task。通常来说,Spark默认设置的数量是偏少的(比如就几十个task),如果task数量偏少的话,就会导致你前面设置好的Executor的参数都前功尽弃。试想一下,无论你的Executor进程有多少个,内存和CPU有多大,但是task只有1个或者10个,那么90%的Executor进程可能根本就没有task执行,也就是白白浪费了资源!因此Spark官网建议的设置原则是,设置该参数为num-executors * executor-cores的2~3倍较为合适,比如Executor的总CPU core数量为300个,那么设置1000个task是可以的,此时可以充分地利用Spark集群的资源。

    spark.storage.memoryFraction

    参数说明:该参数用于设置RDD持久化数据在Executor内存中能占的比例,默认是0.6。也就是说,默认Executor 60%的内存,可以用来保存持久化的RDD数据。根据你选择的不同的持久化策略,如果内存不够时,可能数据就不会持久化,或者数据会写入磁盘。

    参数调优建议:如果Spark作业中,有较多的RDD持久化操作,该参数的值可以适当提高一些,保证持久化的数据能够容纳在内存中。避免内存不够缓存所有的数据,导致数据只能写入磁盘中,降低了性能。但是如果Spark作业中的shuffle类操作比较多,而持久化操作比较少,那么这个参数的值适当降低一些比较合适。此外,如果发现作业由于频繁的gc导致运行缓慢(通过spark web ui可以观察到作业的gc耗时),意味着task执行用户代码的内存不够用,那么同样建议调低这个参数的值。

    spark.shuffle.memoryFraction

    参数说明:该参数用于设置shuffle过程中一个task拉取到上个stage的task的输出后,进行聚合操作时能够使用的Executor内存的比例,默认是0.2。也就是说,Executor默认只有20%的内存用来进行该操作。shuffle操作在进行聚合时,如果发现使用的内存超出了这个20%的限制,那么多余的数据就会溢写到磁盘文件中去,此时就会极大地降低性能。

    参数调优建议:如果Spark作业中的RDD持久化操作较少,shuffle操作较多时,建议降低持久化操作的内存占比,提高shuffle操作的内存占比比例,避免shuffle过程中数据过多时内存不够用,必须溢写到磁盘上,降低了性能。此外,如果发现作业由于频繁的gc导致运行缓慢,意味着task执行用户代码的内存不够用,那么同样建议调低这个参数的值。

    资源参数的调优,没有一个固定的值,需要同学们根据自己的实际情况(包括Spark作业中的shuffle操作数量、RDD持久化操作数量以及spark web ui中显示的作业gc情况),同时参考本篇文章中给出的原理以及调优建议,合理地设置上述参数。

    参数设置demo

    /bin/spark-submit \  

    --master yarn-cluster \  

     --num-executors 100 \  

    --executor-memory 6G \  

     --executor-cores 4 \  

    --driver-memory 1G \  

     --conf spark.default.parallelism=1000 \  

     --conf spark.storage.memoryFraction=0.5 \  

     --conf spark.shuffle.memoryFraction=0.3 \  

    优化后的,一共执行了11秒

    每个task执行时间

    再观察每个task花费时间,观察发现,每个task执行的时间比原来的4秒还长,原因是在资源不变的情况,任务数多了只有2个CPU,CPU变成瓶颈了,要频繁切换如果CPU很多,优化效果会非常好,CPU是瓶颈

    2.3 设置合适的并发度

    任务的并发度有几个级别:job级别的并发,stage级别的并发,task级别的并发。这里谈的并发优化是task级别的,主要就是map任务并行度和reduce任务的并行度,这方面的调优其实可以参考MapReduce的调优,原理都是一样一样的。Spark的任务数需要注意几点:

    1)Map个数默认是和输入文件的blok数是一样的,如hdfs则和 blokc数目一致,hbase则和regio个数一致。

    2)rdd之间的map个数如果不修改,后面的和前面个数一样

    3)reduce默认个数也是和map个数一样

    map设置方法:

    单个设置: sc.textFile("/input/data",100); //指定100个blokc,那么就100个map

    批量设置:将每个map处理数量调大,map数就少了,默认128M

     2.4 修改存储格式

      很多人并不明白为什么文件存储格式会影响文件的读取效率,我打个最简单的比方。我们知道linux 系统是单机版的操作系统,里面有ext3,ext4这样的文件,ext的职责就是对linux系统文件进行管理,HDFS是分布式的文件系统,也是对文件进行管理。好的文件系统就像一个勤快的媳妇,在你房子面积不变的情况下,勤快的媳妇会将物品放的井井有条,利用到房子里面的每个空间,你要拿什么东西,都能很快找到;而不好的文件系统就像是一个懒媳妇,房间里面堆满了东西,找起来很麻烦,房间利用率也非常糟糕,找东西困难,放东西进去也很难,东西越多越脏越乱。(媳妇没有好坏之分,只有适合不适合,还有看你怎么和媳妇相处了,互相了不了解,性格和脾气对不对路,文件系统也是如此)

    文件存储格式和文件系统是一样的原理,文件系统管理的是文件,而文件储存格式管理的是文件内容(管理的是文件中每一行每一列的具体内容)。所以低效率的文件存储格式就像是一个赖媳妇,家里被管的一塌糊涂,东西越多越脏乱差,高效率的文件存储格式就是勤快且聪明的媳妇,一切都管的井然有序,取东西方便,放东西也容易,还会根据不同的物品特征进行摆放,完美,6666!!!

    csv,txt,json等等都是懒媳妇,parquet,orc都是勤快媳妇,那为什么文本文件是懒媳妇,parquet是好媳妇,主要有以下几个原因:

    文本文件为什么不好?

    文本文件行存储,存储占用空间,而且读取数据的时候会读出很多不必要的数据出来,这就好像你叫懒媳妇给你拿一顶帽子,结果她把衣服,鞋子,袜子统统拿出来,然后再从里面挑出你要的帽子。

    parquet为什么好,Spark使用parquet文件存储格式意义在哪里?

    1) 如果说HDFS 是大数据时代分布式文件系统首选标准,那么parquet则是整个大数据时代文件存储格式实时首选标准

    2) 速度更快:从使用spark sql操作普通文件CSV和parquet文件速度对比上看,绝大多数情况

    会比使用csv等普通文件速度提升10倍左右,在一些普通文件系统无法在spark上成功运行的情况

    下,使用parquet很多时候可以成功运行

    3) parquet的压缩技术非常稳定出色,在spark sql中对压缩技术的处理可能无法正常的完成工作

    (例如会导致lost task,lost executor)但是此时如果使用parquet就可以正常的完成

    4) 极大的减少磁盘I/o,通常情况下能够减少75%的存储空间,由此可以极大的减少spark sql处理

    数据的时候的数据输入内容,尤其是在spark1.6x中有个下推过滤器在一些情况下可以极大的

    减少磁盘的IO和内存的占用,(下推过滤器)

    5) spark 1.6x parquet方式极大的提升了扫描的吞吐量,极大提高了数据的查找速度spark1.6和spark1.5x相比而言,提升了大约1倍的速度,在spark1.6X中,操作parquet时候cpu也进行了极大的优化,有效的降低了cpu

    6) 采用parquet可以极大的优化spark的调度和执行。我们测试spark如果用parquet可以有效的减少stage的执行消耗,同时可以优化执行路径

    3.Spark调优经验

    3.1 Spark原理及调优工具

    · Spark Web UI界面

    · jstack、jstat、jprofile

    · history server:当Spark应用退出后,仍可以获得历史Spark应用的stages和tasks执行信息,便于分析程序不明原因挂掉的情况,Spark的history server依赖mr的history server。

    3.2 运行环境优化

    3.2.1 防止不必要的分发

    每个Application都会上传一个spark-assembly-x.x.x-SNAPSHOT-hadoopx.x.x-cdhx.x.x.jar的jar包,影响HDFS的性能以及占用HDFS的空间.对于用户的jar包,有时候体积也非常庞大,我们同样的方式上传hdfs上,然后直接使用。

    1. 依赖ja包重复上传

    执行spark任务有大量jar包上传HDFS,将系统jar包上传到hdfs上,直接使用hdfs上的文件,具体下:

    1)修改conf/spark-default.conf添加以下配置

    spark.yarn.jar hdfs://master:9000/system/spark/jars/spark-assembly-1.6.0-hadoop2.6.0.jar

    2)再次执行SparkPi,提交脚本发生了变化,如下:

    bin/spark-submit  --class  org.apache.spark.examples.SparkPi \ --master yarn-cluster \ --num-executors 3 \ --driver-memory 1g \ --executor-memory 1g \ --executor-cores 1 \ lib/spark-examples*.jar  10

    2. 用户jar包重复上传,避免重复分发

    bin/spark-submit  --class  org.apache.spark.examples.SparkPi \ --master yarn-cluster \ --num-executors 3 \ --driver-memory 1g \ --executor-memory 1g \ --executor-cores 1 \ hdfs://master:9000/user/spark/jars/spark-examples-1.6.0-hadoop2.6.0.jar 10

    3.2.2 提高数据本地性

       分布式数据并行环境下,保持数据的本地性是非常重要的内容,事关分布式系统性能高下,涉及到数据本地性的概念有block、partition、worker、rack。

    Spark中的数据本地性有三种:

    PROCESS_LOCAL是指读取缓存在本地节点的数据

    NODE_LOCAL是指读取本地节点

    RACK_LOCAL是指读非本机架的节点数据

    yarn和hfs尽可能的在一个节点上很多rack local,说明本地性很差,可以通过增加副本数来提升本地新。

    3.2.3 存储格式选择

    BAT等公司80%都是采用列式存储结构 ,相同的列存储在一起,只读取所需的列,io减少,相同的列存在一起,压缩比会非常高。大概是行存储的1/22个apache顶级项目ORC:源自于hive,建表最好都搞成orc,hive常用parquet,rdd读取效率低,spark sql高效率读取

    列式存储和行式存储相比有哪些优势呢?

    ·可以跳过不符合条件的数据,只读取需要的数据,降低IO数据量。

    ·压缩编码可以降低磁盘存储空间。由于同一列的数据类型是一样的,可以使用更高效的压缩编码(例如Run Length Encoding和Delta Encoding)进一步节约存储空间。

    ·只读取需要的列,支持向量运算,能够获取更好的扫描性能。

    从上图可以很清楚地看到,行式存储下一张表的数据都是放在一起的,但列式存储下都被分开保存了。所以它们就有了如下这些优缺点:

    通过字典表压缩数据。为了方面后面的讲解,这部分也顺带提一下了。

    下面中才是那张表本来的样子。经过字典表进行数据压缩后,表中的字符串才都变成数字了。正因为每个字符串在字典表里只出现一次了,所以达到了压缩的目的(有点像规范化和非规范化Normalize和Denomalize)

    下面这个图,通过一条查询的执行过程说明列式存储(以及数据压缩)的优点:

    关键步骤如下:

    1.去字典表里找到字符串对应数字(只进行一次字符串比较)。

    2. 用数字去列表里匹配,匹配上的位置设为1。

    3. 把不同列的匹配结果进行位运算得到符合所有条件的记录下标。

    4.  使用这个下标组装出最终的结果集。

    3.2.4 选择高配机器

    随着硬件的不断发展和企业的需求的不断变化, 大部分企业在集群搭建初期和中期的集群配置都不一样。而Spark是一个非常消耗内存的,因此对于初期一些配置较低,尤其内存较差的机器,是不适合跑spark任务的,更加适合硬件配置高一点的机器上跑。机器配置的参差不齐,应该如何有区别的调度

    ,yarn提供了很好的解决方案。

    yarn支持标签,根据机器的配置,给机器打相应的标签,标签如何打不在讨论范围,目前只有capacity 调度算法支持标签给队列支持打标签,将标签和队列绑定在一起,将应用程序提交到指定标签的队列,执行的时候就会提交到到相应标签节点 。

    很多时候是基础平台的修改,运维负责优化 yarn基于标签的调度,haodop从hadoop2.6.0开始提供基于标签的调度策略。

    3.3 优化操作符

    3.3.1 过滤操作导致多小任务

    filter操作使用不当,很容易引发麻烦。假如一个任务有3个parition,经过filger过滤之后,可能导致部分剩下很少,有些剩余很多,剩余很多的在下一步计算量很大,会拖后腿,其他的作业很快就做完了,而剩余很多的要执行很长时间,整个任务都要延误,而其他很快执行完的作业早就释放资源了

    造成资源还的浪费

    对于这种场景有2种优化策略:

    1)coalses:合并已有的partiion,性能非常高,但是很有可能还不是很均与,

    大的依旧很大,小的进行了合并

    2)repartion:根据数据量灯亮划分,每个partion尽可能均匀,会经过一次shuffle比较均匀

    3.3.2 降低单条记录开销

     做过Java连接数据库操作的人都知道,要尽量避免数据库链接的频繁建立和断开,方法很多,比如数据库连接池的发明。单机版本对数据库的连接操作比较容易管理和控制,但在分布式环境下,数据库的连接管理和控制很麻烦,数据的连接是不可序列化的,因此分布式环境下,统一管理数据库连接显然是不靠谱的。比如这段代码,如果写数据到数据库,就会频繁建立和断开连接,显然是低效率的。因为数据库连接的不可序列化,你也不可能把conn拿出来。

    解决方法是:使用mapPartitions或者mapWith操作符

    原因在于mapPartitions是map的调用的粒度不同,map的输入变换函数是应用于RDD中每个元素,而mapPartitions的输入函数是应用于每个分区。

     假设一个rdd有10个元素,分成3个分区。如果使用map方法,map中的输入函数会被调用10次;而使用mapPartitions方法的话,其输入函数会只会被调用3次,每个分区调用1次。在大数据集情况下的资源初始化开销和批处理处理,尤其数据库链接操作,显得特别好用。

    3.3.3 处理数据倾斜或者任务倾斜

        spark任务中的数据倾斜可能导致某一台节点超负荷运转、内存不足,其他节点都处于空闲等待,加内存不能解决问题。应该找到出问题的shuffle操作,修正它。具体问题具体分析,有如下个方向,但不限于此。

    1)改变数据结构,从源头调整数据的分布,例如分表,分区存放,横向或者纵向分表等。

    2)修改并行度

    · 改变并行度可以改善数据倾斜的原因是因为如果某个task有100个key并且数据巨大,那么有可能导致OOM或者任务运行缓慢;    ·此时如果把并行度变大,那么可以分解每个task的数据量,比如把该task分解给10个task, 那么每个task的数据量将变小,从而可以解决OOM或者任务执行慢.对应reduceByKey而言可以传入并行度参数也可以自定义partition.   · 增加并行度:改变计算资源并没有从根本上解决数据倾斜的问题,但是加快了任务运行的速度.   · 这是加入有倾斜的key, 加随机数前缀,reduceByKey聚合操作可以分而治之,产生的结果是代前缀的,因此需

    3)提取聚集,预操作join,  把倾斜数据在上游进行操作.

    4)局部聚合+全局聚合

    5)尽量避免shuffle

    6)启用推测执行,避免慢节点任务拖后腿,慢磁盘问题在hadoop集群运行了好几年之后非常明显,尤其是磁盘,hadoop以及很多监控工具并没有对慢磁盘进行监控,需要自己写脚本监控。

    下面的例子就是通过对key进行增加随机数,然后进行局部聚合+全局聚合

    3.3.4 复用RDD进行缓存

    RDD是一系列的数据+计算,每一次计算利用上次计算结果都会重新计算,对于一些常用的计算结果可以缓存起来,避免重复计算

     

    cache和persist的区别:cache只有一个默认的缓存级别MEMORY_ONLY ,而persist可以根据情况设置其它的缓存级别。

    3.3.5 慎用耗资源操作符

         选择 Operator 方案的主要目标是减少 shuffle 的次数以及被 shuffle 的文件的大小。因为 shuffle 是最耗资源的操作,所以有 shuffle 的数据都需要写到磁盘并且通过网络传递,repartition,join,cogroup,以及任何 *By 或者 *ByKey 的 transformation 都需要 shuffle 数据。不是所有这些 Operator 都是平等的,但是有些常见的性能陷阱是需要注意的。消耗资源的操作尽量少用,能用小砍刀办到的事情,何须屠龙刀,Spark中比较消耗资源的操作有

    · 笛卡尔积操作

    · 带shuffle的各种算子

    如果可能,用treeReduce代替reduce,尽量用reduceByKey替代groupByKey,举个栗子

        尽量避免差生shuffle,什么时候不发生 Shuffle 当前一个 transformation 已经用相同的 patitioner 把数据分 patition 了,Spark知道如何避免 shuffle

     3.3.6 作业并行化

        Job之间如果没有依赖关系,在资源允许的情况下,当然是能并行就更佳,能高并发的,高吞吐量的时候那还要等什么,除非你不希望活早点干完。Job之间并行注意2点

    · 启动FAIR调度器:spark.scheduler.mode=fait

    · 将action相关操作放到单独线程中

    3.4 作业参数调优

    3.4.1 设置合适的资源量

    实际开发过程中不好把控需要启动多少个Executor,每个Executor多少个内存要在测试环境不断的尝试,每个应用程序都不一样,要根据实际针对性的调整。需要把握几点

    · Executor数目并非越多越好

    · Task数目不应小于Executor的core总数,要不然有些Executor就白启动了,跑空任务

    假设有1G的数据,默认8个task,是启动8个Executor,一个Executor1核,还是启动4个Executor,一个Executor2核,这个需要手动调试去观察和比较。要一个个的去试试才知道了,没有固定的方式方法。

    3.4.2 设置合理的JVM

    JVM调优用的比较多的是调整GC策略,如何调整垃圾的回收机制主要是Executor的垃圾回收机制。可以为spark定制与Hadoop不同的jdk版本,前提是各个节点要安装了 ,通过Spark Web UI可以查看GC消耗的时间。JVM参数设置主要有2个对象:driver和executor。并且提交的方式不一样,参数设置和程序读取的地方也不一样。

    1. Driver的JVM参数

    yarn-client模式: (1)-Xmx,-Xms,默认读取spark-env.sh,文件中的SPARK_DRIVER_MEMORY值,-Xmx,-Xms值一样大小;(2)PermSize,默认读取spark-class文件中的JAVA_OPTS="-XX:MaxPermSize=256m $OUR_JAVA_OPTS"值;(3)GC方式,默认读取的是spark-class文件中的JAVA_OPTS

    yarn-cluster模式:(1) -Xmx,-Xms读取的是spark-default.conf文件中的spark.driver.extraJavaOptions对应的JVM参数值;(2)PermSize,读取的是spark-default.conf文件中的spark.driver.extraJavaOptions对应的JVM参数值。(3)取的是spark-default.conf文件中的spark.driver.extraJavaOptions对应的参数值

    上值最后均可被spark-submit工具中的--driver-java-options参数覆盖。

    2.Executor的JVM参数

    -Xmx,-Xms,如果是yarn-client模式,则默认读取spark-env文件中的SPARK_EXECUTOR_MEMORY值,-Xmx,-Xms值一样大小;如果是yarn-cluster模式,则读取的是spark-default.conf文件中的spark.executor.extraJavaOptions对应的JVM参数值。

    PermSize,两种模式都是读取的是spark-default.conf文件中的spark.executor.extraJavaOptions对应的JVM参数值。GC方式,两种模式都是读取的是spark-default.conf文件中的spark.executor.extraJavaOptions对应的JVM参数值。

    3.JVM参数说明

    -Xms:初始堆的大小,默认是物理内存的1/64

    -Xmx:最大堆大小,默认物理内存的1/4

    -Xmn:年轻代大小

    -XX:PermSize:持久化初始值,默认是物理内存的1/64

    -XX:MaxPermSize:设置持久代最大值,默认物理内存的1/4

    -XX:+UseConcMarkSweepGC,使用CMS内存收集

    3.4.3 启用更高效的序列化方法

      序列化常用于网络传输和数据持久化以便于存储和传输,分布式集群有大量的网络传输和数据存储,序列化的重要性不言而喻。

    如果序列化格式序列化过程缓慢,或者需要占用字节很多,都会大大拖慢整体的计算效率。通常,序列化都是Spark应用优化时首先需要关注的地方。Spark着眼于要达到便利性(允许你在计算过程中使用任何Java类型)和性能的一个平衡。Spark主要提供了两个序列化库:

    Spark主要提供了两个序列化库:

    Java serialization: 默认情况,Spark使用Java自带的ObjectOutputStream 框架来序列化对象,这样任何实现了 java.io.Serializable 接口的对象,都能被序列化。同时,你还可以通过扩展 java.io.Externalizable 来控制序列化性能。Java序列化很灵活但性能较差,同时序列化后占用的字节数也较多。

    Kryo serialization: Spark还可以使用Kryo 库(版本2)提供更高效的序列化格式。Kryo的序列化速度和字节占用都比Java序列化好很多(通常是10倍左右),但Kryo不支持所有实现了Serializable 接口的类型,它需要你在程序中 register 需要序列化的类型,以得到最佳性能。

    要切换到使用 Kryo,你可以在 SparkConf 初始化的时候调用 conf.set(“spark.serializer”, “org.apache.spark.serializer.KryoSerializer”)。这个设置不仅控制各个worker节点之间的混洗数据序列化格式,同时还控制RDD存到磁盘上的序列化格式。目前,Kryo不是默认的序列化格式,因为它需要你在使用前注册需要序列化的类型,不过我们还是建议在对网络敏感的应用场景下使用Kryo。

    3.4.4 增大off head内存

      spark为了引擎更高效,用了部分堆外内存,这种类型的内存用JVM的参数就很难控制住,内存不够会被杀掉。可能不是jvm的内存不够了,可能是对外内存不足,设置overhead参数

    memory*0.1。Spark On YARN,Executor经常被YARN杀掉报的错如下所示:

    · 解决方法:增大overhead内存大小,默认是内存的10%

    driver:spark.yarn.driver.memeoryOverhead

    executor:spark.yarn.executor.memoryOverhead

    3.4.5 shuffle参数调优

        1) shuffle实现的选择

    Hash-based Shuffle:每个executor产生R个文件

    Sorted-based Shuffle(默认实现):每个Map Task产生一个文件,更省内存的实现

    如何选择?reduce task数目较多时,选择sort-based实现,修改spark.shuffle.manager,选择hash或者sort,shuffle的详细可以参考上一篇文章

     

        

    2)默认情况下

    reduce task收到的数据会存到内存(HashTable)中,防止reduce task的OOM可以将spark.shuffle.spill设为true。

    spill条件,可通过spark.shuffle.memeoryFraction设置,默认是0.3。

    3.4.6 设置reduce task数目

      Reduce Task数目过小,运行过慢,且可能导致OOM

    Reduce Task数目过大,产生较多小任务,启动和调度开销增大

    显示设置reduce task数目,比如groupByKey,reduceByKey等均提供了设置参数

    默认修改参数值spark.default.parallelism,模式是跟前一个阶段一致。

    3.4.7 使用spark sql实现

      越来越多的程序会采用spark sql编写,而不是spark core API,原因是

    1)更加简单,DataFrame/Dataset是更高级的API,而RDD API是比较低级

    2)更加高效,spark sql自带了优化器,可以自动优化你的程序

    原文出处:https://www.cnblogs.com/licheng/p/6822435.html

    最新回复(0)