本文讲述shuffleReader的具体实现。从这篇文章中,我们已经知道shuffleReader是一个抽象类,该抽象类只有一个read函数,用来在shuffle阶段从本地或远程获取数据。
该抽象类的实现类是:BlockStoreShuffleReader。本文主要讲述该实现类的具体实现。
该类会实现ShuffleReader抽象类的read函数。该read函数的实现被封装在ShuffleBlockFetcherIterator这个迭代器类中。
该迭代器实现了从远程或本地读取shuffle数据块的具体逻辑,然后返回一个迭代器,供数据使用者读取数据。
该迭代器会调用initialize()函数来完成相关功能。在该函数中会调用splitLocalRemoteBlocks()函数来分割本地和远程的数据块。具体的实现逻辑如下: (1) 通过TaskContext.addTaskCompletionListener来添加一个回调函数clean,该回调函数会在迭代器僵死或停止时释放掉中间过程中用到的buffer的内存。 (2) 然后再调用splitLocalRemoteBlocks函数来,完成本地和远程数据块的分割。 (3) 把对远端的请求添加到fetchRequests队列中,顺序是随机的。 (4) 调用fetchUpToMaxBytes函数来发送获取数据块的请求,但获取数据块的总大小不能超过:maxBytesInFlight。这样设计是为了防止内存溢出。 (5) 获取本地数据块:调用fetchLocalBlocks函数来完成。在shuffle过程中,若数据块在本地,效率是最高的。
该函数的声明如下:
private[this] def splitLocalRemoteBlocks(): ArrayBuffer[FetchRequest]可以看到该函数其实是返回一个FetchRequest对象的数组。
FetchRequest类 FetchRequest是一个case class,它是向远端节点的BlockManager发送的获取数据块的请求类。该类的声明如下: case class FetchRequest(address: BlockManagerId, blocks: Seq[(BlockId, Long)]) { val size = blocks.map(_._2).sum }可以看到,该类包含了一个BlockManagerId类和数据块的id对象的列表:blocks。
在spark-2.4x中实现该函数时,有一些约定:远程请求的最大数据量为:maxBytesInFlight/5。保持它们小于maxBytesInFlight的原因是:允许最多5个节点来并行提取数据,而不是阻止从一个节点读取输出。
到此,该函数已经把需要fetch的数据块分开了,把数据块的信息分别保存到了本地和远程的一个set中。这样,在后续的数据块获取时,只需要遍历对应的set进行操作即可。
该函数向远端发送获取数据块的请求,请求的最大数据量为maxBytesInFlight。若无法立即从远程主机获取数据块,请求将被推迟到下次进行处理。
该函数先从延迟请求队列中获取上次没有处理完的请求,先处理延迟的请求。然后遍历请求队列,处理正常的数据块获取请求。
从以上代码可见,最终是调用sendRequest(req: FetchRequest) 函数来发送请求。
在spark中有几种实现shuffleClient的方式,会在BlockManager对象初始化时进行,代码片段如下:
private[spark] class BlockManager(...) { ... // 根据配置创建或获取数据传输服务对象 private[spark] val shuffleClient = if (externalShuffleServiceEnabled) { val transConf = SparkTransportConf.fromSparkConf(conf, "shuffle", numUsableCores) new ExternalShuffleClient(transConf, securityManager, securityManager.isAuthenticationEnabled(), conf.get(config.SHUFFLE_REGISTRATION_TIMEOUT)) } else { blockTransferService } ... }通过以上代码可知,若是配置了参数spark.shuffle.service.enabled,就会创建ExternalShuffleClient类的对象,否则,会创建blockTransferService类的对象作为shuffleClient。blockTransferService传输服务的实现,目前采用的是Netty框架,通过该框架实现的传输服务类是:NettyBlockTransferService。
该类的主要任务是:通过netty服务一次向远端获取一组数据块。
init():服务初始化该函数用来初始化该类的一些变量,创建并启动传输服务。在创建BlockManager时,会调用该函数进行数据块传输服务的初始化。该函数的实现流程如下:
创建NettyBlockRpcServer类对象rpcHandler基于rpcHandler对象创建TransportContext对象transportContext,通过该对象可以用来创建数据传输的客户端和服务端调用createServer函数创建数据块传输服务,并启动该服务,具体步骤如下: 调用transportContext.createServer来创建数据块传输服务,其实就是创建一个TransportServer对象。 调用Utils.startServiceOnPort(_port, startService,…)在给定端口上启动该服务,若启动失败会根据参数spark.port.maxRetries的次数来进行尝试。shuffle传输服务的端口默认值是由参数:spark.blockManager.port配置的。 fetchBlocks函数的实现 该函数的实现逻辑如下: 创建一个新的RetryingBlockFetcher.BlockFetchStarter类,并定义createAndStart函数,然后会调用该函数来获取数据。createAndStart函数的实现代码如下: override def createAndStart(blockIds: Array[String], listener: BlockFetchingListener) { val client = clientFactory.createClient(host, port) new OneForOneBlockFetcher(client, appId, execId, blockIds, listener, transportConf, tempFileManager).start() } }从以上代码可见,该函数会创建一个rpc的client,然后创建一个OneForOneBlockFetcher类的对象,并调用start函数来完成数据的fetch。
start函数会调用 client.sendRpc来发送rpc请求,成功后调用fetchChunk或client.stream()来获取数据块。stream()是通过流的方式来传输,会使用远端的给定流ID来传输数据。而fetchChunk会对数据块从0进行编号,每次请求一个数据块,而一个数据块可能被请求多次,但流不支持这样的操作。 假设仅使用单一的TransportClient来获取数据块,那么:若同时请求多个数据块,这些请求将会被排队,而且会保证数据块按照请求的顺序返回。本文分析了spark-2.4中的shuffle reader的实现原理。