ShuffleClient不仅是将shuffle文件上传到其他Executor或者下载远程Executor文件到本地的客户端,也是提供可以被其他Executor访问的shuffle服务。
Spark任务是分布式计算的,每个Task运行在不同的机器上。map任务执行完成后会将输出结果存储到任务执行所在的机器的本地存储中,并通过本地的MapOutputTrackerWorker向Driver中DAGScheduler里的MapOutputTrackerMaster汇报。reduce任务很可能和map任务不在同一台机器上执行,reduce任务执行前首先会向本地MapOutputTrackerWorker请求map任务输出结果(如果没有则其向Driver中DAGScheduler里的MapOutputTrackerMaster获取)。得到地址后,reduce任务会通过ShuffleClient远程下载map任务的中间输出。因此,ShuffleClient是Spark计算框架中的一个重要组件。
ShuffleClient的UML类图如下:
ShuffleClient是在BlockManager中创建的,根据spark.shuffle.service.enabled配置来决定是否启用外部服务作为shuffle服务,默认使用构造函数中传入的blockTransferService:
// BlockManager.scala private[spark] val externalShuffleServiceEnabled = conf.getBoolean("spark.shuffle.service.enabled", false) // 读取其他executor的shuffle文件的客户端。它或者是一个外部服务,或者仅仅是标准的BlockTransferService // 直接地连接到其他executors。 private[spark] val shuffleClient = if (externalShuffleServiceEnabled) { val transConf = SparkTransportConf.fromSparkConf(conf, "shuffle", numUsableCores) new ExternalShuffleClient(transConf, securityManager, securityManager.isAuthenticationEnabled()) } else { blockTransferService }BlockManager是在SparkEnv中创建的,并同时创建了blockTransferService服务并将其传入BlockManager的构造函数中。blockTransferService的实际实现类为NettyBlockTransferService:Spark和Hadoop一样,都采用Netty作为shuffle服务。下面我们以默认的NettyBlockTransferService为例进行介绍。
// SparkEnv.scala val blockTransferService = new NettyBlockTransferService(conf, securityManager, bindAddress, advertiseAddress, blockManagerPort, numUsableCores) val blockManagerMaster = new BlockManagerMaster(registerOrLookupEndpoint( BlockManagerMaster.DRIVER_ENDPOINT_NAME, new BlockManagerMasterEndpoint(rpcEnv, isLocal, conf, listenerBus)), conf, isDriver) // NB: blockManager is not valid until initialize() is called later. val blockManager = new BlockManager(executorId, rpcEnv, blockManagerMaster, serializerManager, conf, memoryManager, mapOutputTracker, shuffleManager, blockTransferService, securityManager, numUsableCores首先看下ShuffleClient接口类的定义如下。ShuffleClient定义了从executor或者外部服务读取shuffle文件数据的接口。
/** 提供了一个从executor或者外部服务读取shuffle文件的接口。 */ public abstract class ShuffleClient implements Closeable { /** * 初始化ShuffleClient,指定当前executor的appId。 * 必须在ShuffleClient其他方法调用前被调用。 */ public void init(String appId) { } /** * 从远程节点异步获取一组blocks * * 注意:该API接口接收数组参数,所以可以实现批量请求。另外,该方法没有返回一个future对象,所以子类 * 实现可以在一个block获取成功后立即回调onBlockFetchSuccess,而不是等待所有的blocks都获取成功。 */ public abstract void fetchBlocks( String host, int port, String execId, String[] blockIds, BlockFetchingListener listener, TempShuffleFileManager tempShuffleFileManager); }BlockTransferService是继承自ShuffleClient接口的抽象类,负责数据的传输。其中定义了blocks的批量获取、单个获取和单个同步或异步上传的接口。类的定义如下:
private[spark] abstract class BlockTransferService extends ShuffleClient with Closeable with Logging { // 通过提供可以用来获取和保存本地blocks块的BlockDataManager来初始化传输服务。 def init(blockDataManager: BlockDataManager): Unit // 关闭服务 def close(): Unit // 服务监听的端口号,只有在[[init]]调用后才可用。 def port: Int // 服务监听的主机名,只有在[[init]]调用后才可用。 def hostName: String /** * 从远程节点异步获取一组blocks * * 注意:该API接口接收数组接口,所以可以实现批量请求。另外,该方法没有返回一个future对象,所以子类 * 实现可以在一个block获取成功后立即回调onBlockFetchSuccess,而不是等待所有的blocks都获取成功。 */ override def fetchBlocks( host: String, port: Int, execId: String, blockIds: Array[String], listener: BlockFetchingListener, shuffleFiles: Array[File]): Unit // 上传单个block块到远程节点,仅在[[init]]之后才可使用。 def uploadBlock( hostname: String, port: Int, execId: String, blockId: BlockId, blockData: ManagedBuffer, level: StorageLevel, classTag: ClassTag[_]): Future[Unit] /** * 一个特殊的[[fetchBlocks]]的例子,它阻塞式地读取一个block块。 * 只有在调用[[init]]后才可以使用它。 */ def fetchBlockSync(host: String, port: Int, execId: String, blockId: String): ManagedBuffer = { // 监控等待的线程. val result = Promise[ManagedBuffer]() fetchBlocks(host, port, execId, Array(blockId), new BlockFetchingListener { override def onBlockFetchFailure(blockId: String, exception: Throwable): Unit = { result.failure(exception) } override def onBlockFetchSuccess(blockId: String, data: ManagedBuffer): Unit = { val ret = ByteBuffer.allocate(data.size.toInt) ret.put(data.nioByteBuffer()) ret.flip() result.success(new NioManagedBuffer(ret)) } }, shuffleFiles = null) ThreadUtils.awaitResult(result.future, Duration.Inf) } /** * 上传单个block块到远程节点,仅在[[init]]调用之后才可用。 * 该方法类似于[[uploadBlock]]方法,除了该方法会阻塞线程直到block上传完成。 */ def uploadBlockSync( hostname: String, port: Int, execId: String, blockId: BlockId, blockData: ManagedBuffer, level: StorageLevel, classTag: ClassTag[_]): Unit = { val future = uploadBlock(hostname, port, execId, blockId, blockData, level, classTag) ThreadUtils.awaitResult(future, Duration.Inf) } }BlockTransferService的实现类实现为NettyBlockTransferService,它使用Netty异步时间驱动的网络应用框架,获取和上传远程节点上的Block集合。
NettyBlockTransferService的各项功能只能在init方法执行完成后才可用,初始化包含四大部分:
创建RPC服务RpcServer(实现子类NettyBlockRpcServer);
构造传输上下文TransportContext;
创建RPC客户端工厂TransportClientFactory;
创建Netty服务器TransportServer;
// NettyBlockTransferService.scala override def init(blockDataManager: BlockDataManager): Unit = { // 1.创建RpcServer; val rpcHandler = new NettyBlockRpcServer(conf.getAppId, serializer, blockDataManager) var serverBootstrap: Option[TransportServerBootstrap] = None var clientBootstrap: Option[TransportClientBootstrap] = None if (authEnabled) { serverBootstrap = Some(new AuthServerBootstrap(transportConf, securityManager)) clientBootstrap = Some(new AuthClientBootstrap(transportConf, conf.getAppId, securityManager)) } // 2.构建TransportContext; transportContext = new TransportContext(transportConf, rpcHandler) clientFactory = transportContext.createClientFactory(clientBootstrap.toSeq.asJava) server = createServer(serverBootstrap.toList) appId = conf.getAppId logInfo(s"Server created on ${hostName}:${server.getPort}") }当map任务与reduce任务处于不同节点时,reduce任务需要从远端节点下载map任务的中间输出。NettyBlockRpcServer提供了下载Block文件的功能,并为了容错需要将Block的数据备份到其他节点上,还提供了上传Block文件的RPC服务。
/** * 服务通过为每个block请求简单地注册一个chunk来打开block块。用 * 处理打开及上传BlockManager管理的任意blocks块。 * * 打开的blocks被通过"一对一"的策略进行注册,意味着每个传输层的chunk对应一个Spark层的shuffle块。 */ class NettyBlockRpcServer( appId: String, serializer: Serializer, blockManager: BlockDataManager) extends RpcHandler with Logging {...}TransportContext用于维护传输上下文,可以创建Netty服务和Netty访问的客户端。TransportContext的组成如下:
TransportConf:主要控制Netty框架提供的shuffle的I/O交互的客户端和服务端线程数量;
RpcHandler:负责shuffle的I/O服务端在接收到客户端的RPC请求后,提供打开Block或者上传Block的RPC处理,此处即为NettyBlockRpcServer;
decoder:在shuffle的I/O服务端对客户端传来的ByteBuf进行解析,防止丢包和解析错误;
encoder:在shuffle的I/O客户端对消息内容进行编码,防止服务端丢包和解析错误;
/** * 包含创建TransportServer和TransportClientFactory的上下文信息, * 并用`org.apache.spark.network.server.TransportChannelHandler`来设置Netty Channel管道。 * * TransportClient提供了两个通信协议:控制层面的RPCs和数据层面的"chunk fetching"。 * RPCs的处理是在TransportContext范围之外执行的(例如,通过一个用户提供的处理方法)。 * 并且,它负责设置流,这些流可以利用零拷贝IO在chunks中来流式传输数据。 * * TransportServer和TransportClientFactory为每个通道创建一个TransportChannelHandler。 * 由于每个TransportChannelHandler都包含TransportClient, * 这使得服务器进程可以在现有channel通道上将消息发送回客户端。 */ public class TransportContext { private final TransportConf conf; private final RpcHandler rpcHandler; /** * 在类加载器切换为ExecutorClassLoader之前,强制创建MessageEncoder和MessageDecoder * (See SPARK-17714) */ private static final MessageEncoder ENCODER = MessageEncoder.INSTANCE; private static final MessageDecoder DECODER = MessageDecoder.INSTANCE; /** * 初始化一个ClientFactory,它在返回新客户端之前运行给定的TransportClientBootstraps。 * Bootstraps将同步执行,并且必须运行成功才能创建客户端。 */ public TransportClientFactory createClientFactory(List<TransportClientBootstrap> bootstraps) { return new TransportClientFactory(this, bootstraps); } public TransportClientFactory createClientFactory() { return createClientFactory(new ArrayList<>()); } /** 创建尝试绑定到特定端口的服务器。 */ public TransportServer createServer(int port, List<TransportServerBootstrap> bootstraps) { return new TransportServer(this, null, port, rpcHandler, bootstraps); } ... }为什么需要MessageEncoder和MessageDecoder?因为在基于流的传输里(比如TCP/IP),接收到的数据首先会被存储到一个socket接收缓冲里。不幸的是,基于流的传输并不是一个数据包队列,而是一个字节队列。即使发送了2个独立的数据包,操作系统也不会作为2个消息处理,而仅仅认为是一连串的字节。因此不能保证远程写入的数据会被准确地解析。因此,接收方不管是客户端还是服务端,都应该把接收到的数据整理成一个或者多个更有意义并且让程序的逻辑更好理解的数据。
TransportClientFactory由TransportContext的createClientFactory方法创建,是创建Netty客户端TransportClient的工厂类,用于向Netty服务端发送RPC请求。TransportClientFactory维护了一个client数组,当所需的客户端不存在的时候,创建一个新的网络连接,然后将连接保存到client数组中。其由以下部分组成:
clientBootstraps:用于缓存客户端列表;connectionPool:用于缓存客户端连接;numConnectionsPerPeer:节点之间取数据的连接数,可以使用属性spark.shuffle.io.numConnectionsPerPeer来配置,默认为1;socketChannelClass:客户端channel被创建时使用的类,可以使用属性spark.shuffle.io.mode来配置,默认为NioSocketChannel;workerGroup:根据Netty的规范,客户端只有work组,所以此处创建workerGroup,实际是NioEventLoopGroup;pooledAllocator:汇集ByteBuf但对本地线程缓存禁用的分配器。TransportServer由TransportContext的createServer方法创建,提供了Netty实现的服务器端,用于提供RPC服务(比如上传、下载等)。
NettyBlockTransferService作为shuffle客户端,其fetchBlocks方法可获取远程shuffle文件,实际是利用NettyBlockTransferService中创建的Netty服务。NettyBlockTransferService提供了非重试版本OneForOneBlockFetcher和重试版本RetryingBlockFetcher的BlockFetcher,重试次数通过参数spark.[module].io.maxRetries进行配置,默认是重试3次。
/** * NettyBlockTransferService的fetchBlocks方法利用netty服务获取远程shuffle文件。 * 需要用主机名称,端口号,excutorid和blockids */ override def fetchBlocks( host: String, port: Int, execId: String, blockIds: Array[String], listener: BlockFetchingListener, tempShuffleFileManager: TempShuffleFileManager): Unit = { logTrace(s"Fetch blocks from h o s t : host: host:port (executor id $execId)") try { val blockFetchStarter = new RetryingBlockFetcher.BlockFetchStarter { override def createAndStart(blockIds: Array[String], listener: BlockFetchingListener) { // clientFactory维护了一个client数组,获取或者创建一个与目标主机和端口匹配的socket连接 val client = clientFactory.createClient(host, port) new OneForOneBlockFetcher(client, appId, execId, blockIds, listener, transportConf, tempShuffleFileManager).start() } }
val maxRetries = transportConf.maxIORetries() if (maxRetries > 0) { // 注意这个Fetcher会正确处理maxRetries == 0;我们判断避免它是以防代码中有bug。 // 一旦我们确定代码的稳定性,我们将会删除该if语句。 new RetryingBlockFetcher(transportConf, blockFetchStarter, blockIds, listener).start() } else { blockFetchStarter.createAndStart(blockIds, listener) } } catch { case e: Exception => logError("Exception while beginning fetchBlocks", e) blockIds.foreach(listener.onBlockFetchFailure(_, e)) } }NettyBlockTransferService本身也利用创建的Netty服务实现uploadBlock方法为其他远程Executor提供Block的读取服务,其实现流程如下:
创建Netty服务的客户端;将Block的存储级别StorageLevel和ClassTag序列化;将Block的字节数据ByteBuffer转化为数组,以便于序列化传输;将由appId、execId、blockId、序列化的StorageLevel和ClassTag、以及转换为数组的Block封装的UploadBlock消息序列化为字节数组;最终调用Netty客户端的sendRpc方法将字节数组上传,回调函数RpcResponseCallback根据RPC的结果更改上传状态。override def uploadBlock( hostname: String, port: Int, execId: String, blockId: BlockId, blockData: ManagedBuffer, level: StorageLevel, classTag: ClassTag[_]): Future[Unit] = { val result = PromiseUnit val client = clientFactory.createClient(hostname, port)
// 使用JavaSerializer将StorageLevel和ClassTag序列化为字节。 // 其他所有内容都使用我们的二进制协议编码。 val metadata = JavaUtils.bufferToArray(serializer.newInstance().serialize((level, classTag))) // 将nio缓冲区转换或复制到数组中以对其进行序列化。 val array = JavaUtils.bufferToArray(blockData.nioByteBuffer()) // 通过Client发送UploadBlock消息 client.sendRpc(new UploadBlock(appId, execId, blockId.toString, metadata, array).toByteBuffer, new RpcResponseCallback { override def onSuccess(response: ByteBuffer): Unit = { logTrace(s"Successfully uploaded block $blockId") result.success((): Unit) } override def onFailure(e: Throwable): Unit = { logError(s"Error while uploading block $blockId", e) result.failure(e) } }) result.future }