Spark Core:第三章 spark shuffle

    xiaoxiao2025-04-18  5

    Spark Core:第三章 spark shuffle


    文章目录

    Spark Core:第三章 spark shuffle一、HashShuffle1. 普通机制2. 合并机制 二、SortShuffle1. 普通机制2. bypass机制 三、Shuffle 文件寻址1. 对象2. 过程 四、Spark 内存管理1. 静态内存管理2. 统一内存管理 五、Spark Shuffle调优


    —>Spark知识点总结导航<—


    一、HashShuffle

    1. 普通机制

     (1) 产生磁盘小文件数量    M(map task)*R(reduce task)

     (2) 过程问题

     (3) 产生磁盘小,文件多的问题    ① shuffle write 对象多

       ② shffle read 对象多

       ③ 对象多内存不足就会有gc,gc还不满足内存使用就会OOM

       ④ shuffle 磁盘小文件多,会造成节点之间的连接多,连接多容易不稳定

    2. 合并机制

     (1) 产生磁盘小文件数量

     (2) 过程    ① Map task 处理完数据之后,写到buffer缓存区,buffer大小是32K,个数与reduce task一致

       ② Executor中每个core中的task 共用一份buffer缓存区

       ③ 每个buffer缓存区满32K 溢写磁盘,每个buffer 最终对应一个磁盘小文件

       ④ reduce task 拉取数据


    二、SortShuffle

    1. 普通机制

     (1) 产生磁盘小文件数量    2*M( map task)

     (2) 过程

    2. bypass机制

     (1) 产生磁盘小文件数量    2*M( map task)

     (2) 过程    ① map task处理完数据之后,首先写入一个5M的数据结构

       ② sortShuffle 有不定期估算机制,来估算这个内存结构大小,当估算超过真实大小,会申请内存:2*估算-当前

       ③ 申请到内存继续写入内存数据机构,申请不到会溢写磁盘

       ④ 溢写磁盘过程中不会有排序,每批次1万条溢写,最终对应两个磁盘文件,一个索引文件,一个数据文件

       ⑤ reduce task 拉取数据首先读取索引文件,再拉取数据


    三、Shuffle 文件寻址

    1. 对象

    MapOutputTracker 管理磁盘小文件 BlockManager 块管理 BlockManagerMaster(Driver) DiskStore 管理磁盘数据 MemoryStore 管理内存数据 ConnectionManager 负责连接其他BlockManager BlockTransferService 负责拉取数据 BlockManagerWorker(Executor) DiskStore 管理磁盘数据 MemoryStore 管理内存数据 ConnectionManager 负责连接其他BlockManager BlockTransferService 负责拉取数据

    2. 过程

       ① map task 处理完数据之后,将数据结果和落地的磁盘位置封装到MapStatus对象中,通过Worker中的MapOutputTrackerWorker汇报给Driver中的MapOutputTrackerMaster,Driver掌握的磁盘小文件的位置

       ② reduce task处理数据之前先向Driver中MapOutputTrackerMaster 要磁盘小文件的位置信心,Driver返回

       ③ reduce 端通过BlockManager 中的ConnectionManager 连接数据所在节点的BlockManager

       ④ 连接上之后,通过BlockManager中的BlockTransferService 默认启动5个子线程去拉取数据,默认一次拉取的数据量不能超过48M

       ⑤ 拉取过来的数据放在Executor中的Shuffle内存中(spark.shuffle.memoryFraction 0.2)


    四、Spark 内存管理

    1. 静态内存管理

    0.2 task 运行 0.2--spark.shuffle.memoryFraction 0.2 0.2 预留 0.8 shuffle聚合内存 0.6--spark.storage.memoryFraction 0.6 0.1 预留 0.9 0.2 反序列化数据 --spark.storage.unrollFraction 0.2 0.8 RDD的缓存和广播变量

    2. 统一内存管理

    300M预留 (总-300M) 0.25 task运行 0.75--spark.memory.fraction 0.75 0.5 shuffle聚合内存 0.5 RDD缓存和广播变量 --spark.memory.storageFraction 0.5

    五、Spark Shuffle调优

    spark.shuffle.file.buffer 32k spark.reducer.maxSizeInFlight 48M spark.shuffle.io.maxRetries 3 spark.shuffle.io.retryWait 5s spark.shuffle.memoryFraction 0.2 spark.shuffle.manager hash|sort spark.shuffle.sort.bypassMergeThreshold 200----针对SortShuffle spark.shuffle.consolidateFiles ----针对HashShuffle

    参数设置

    在代码中设置 conf.set(k,v) (不建议) --最高在spark-defaults.conf中配置 (不建议) --第三在提交任务时设置 ./spark-submit --conf xxx=xxx --第二
    --->有问题请联系QQ1436281495^_^
    最新回复(0)