很久没有做技术方面的分享了,今天闲来有空写一篇关于Kafka通信方面的文章与大家共同学习。
说明:
Kafka SocketServer是基于Java NIO来开发的,采用了Reactor的模式,其中包含了1个Acceptor负责接受客户端请求,N个Processor线程负责读写数据,M个Handler来处理业务逻辑。在Acceptor和Processor,Processor和Handler之间都有队列来缓冲请求。
下面我们就针对以上整体设计思路分开讲解各个不同部分的源代码。
说明:
ConnectionQuotas对象负责管理连接数/IP, 创建一个Acceptor侦听者线程,初始化N个Processor线程,processors是一个线程数组,可以作为线程池使用,默认是三个,Acceptor线程和N个Processor线程中每个线程都独立创建Selector.open()多路复用器,相关代码在下面:
val numNetworkThreads = props.getIntInRange("num.network.threads", 3, (1, Int.MaxValue)); val serverChannel = openServerSocket(host, port);范围可以设定从1到Int的最大值。
此处采用的是同步非阻塞逻辑,每隔500MS轮询一次,关于同步非阻塞的知识点在http://www.jianshu.com/p/e9c6690c0737 当有请求到来的时候采用轮询的方式获取一个Processor线程处理请求,代码如下:
currentProcessor = (currentProcessor + 1) % processors.length之后将代码添加到newConnections队列之后返回,代码如下:
def accept(socketChannel: SocketChannel) { newConnections.add(socketChannel) wakeup()} //newConnections是一个线程安全的队列,存放SocketChannel通道 private val newConnections = new ConcurrentLinkedQueue[SocketChannel]()先来重点看一下configureNewConnections这个方法:
private def configureNewConnections() { while(newConnections.size() > 0) { val channel = newConnections.poll() debug("Processor " + id + " listening to new connection from " + channel.socket.getRemoteSocketAddress) channel.register(selector, SelectionKey.OP_READ) } }循环判断NewConnections的大小,如果有值则弹出,并且注册为OP_READ读事件。 再回到主逻辑看一下read方法。
def read(key: SelectionKey) { lruConnections.put(key, currentTimeNanos) val socketChannel = channelFor(key) var receive = key.attachment.asInstanceOf[Receive] if(key.attachment == null) { receive = new BoundedByteBufferReceive(maxRequestSize) key.attach(receive) } val read = receive.readFrom(socketChannel) val address = socketChannel.socket.getRemoteSocketAddress(); trace(read + " bytes read from " + address) if(read < 0) { close(key) } else if(receive.complete) { val req = RequestChannel.Request(processor = id, requestKey = key, buffer = receive.buffer, startTimeMs = time.milliseconds, remoteAddress = address) requestChannel.sendRequest(req) key.attach(null) // explicitly reset interest ops to not READ, no need to wake up the selector just yet key.interestOps(key.interestOps & (~SelectionKey.OP_READ)) } else { // more reading to be done trace("Did not finish reading, registering for read again on connection " + socketChannel.socket.getRemoteSocketAddress()) key.interestOps(SelectionKey.OP_READ) wakeup() } }说明
1、把当前SelectionKey和事件循环时间放入LRU映射表中,将来检查时回收连接资源。 2、建立BoundedByteBufferReceive对象,具体读取操作由这个对象的readFrom方法负责进行,返回读取的字节大小。
如果读取完成,则修改状态为receive.complete,并通过requestChannel.sendRequest(req)将封装好的Request对象放到RequestQueue队列中。如果没有读取完成,则让selector继续侦听OP_READ事件。说明
KafkaRequestHandler也是一个事件处理线程,不断的循环读取requestQueue队列中的Request请求数据,其中超时时间设置为300MS,并将请求发送到apis.handle方法中处理,并将请求响应结果放到responseQueue队列中去。 代码如下:
try{ trace("Handling request: " + request.requestObj + " from client: " + request.remoteAddress) request.requestId match { case RequestKeys.ProduceKey => handleProducerOrOffsetCommitRequest(request) case RequestKeys.FetchKey => handleFetchRequest(request) case RequestKeys.OffsetsKey => handleOffsetRequest(request) case RequestKeys.MetadataKey => handleTopicMetadataRequest(request) case RequestKeys.LeaderAndIsrKey => handleLeaderAndIsrRequest(request) case RequestKeys.StopReplicaKey => handleStopReplicaRequest(request) case RequestKeys.UpdateMetadataKey => handleUpdateMetadataRequest(request) case RequestKeys.ControlledShutdownKey => handleControlledShutdownRequest(request) case RequestKeys.OffsetCommitKey => handleOffsetCommitRequest(request) case RequestKeys.OffsetFetchKey => handleOffsetFetchRequest(request) case RequestKeys.ConsumerMetadataKey => handleConsumerMetadataRequest(request) case requestId => throw new KafkaException("Unknown api code " + requestId) } } catch { case e: Throwable => request.requestObj.handleError(e, requestChannel, request) error("error when handling request %s".format(request.requestObj), e) } finally request.apiLocalCompleteTimeMs = SystemTime.milliseconds }说明如下:
参数说明对应方法RequestKeys.ProduceKeyproducer请求ProducerRequestRequestKeys.FetchKeyconsumer请求FetchRequestRequestKeys.OffsetsKeytopic的offset请求OffsetRequestRequestKeys.MetadataKeytopic元数据请求TopicMetadataRequestRequestKeys.LeaderAndIsrKeyleader和isr信息更新请求LeaderAndIsrRequestRequestKeys.StopReplicaKey停止replica请求StopReplicaRequestRequestKeys.UpdateMetadataKey更新元数据请求UpdateMetadataRequestRequestKeys.ControlledShutdownKeycontrolledShutdown请求ControlledShutdownRequestRequestKeys.OffsetCommitKeycommitOffset请求OffsetCommitRequestRequestKeys.OffsetFetchKeyconsumer的offset请求OffsetFetchRequest我们回到Processor线程类中,processNewRequest()方法是发送请求,那么会调用processNewResponses()来处理Handler提供给客户端的Response,把requestChannel中responseQueue的Response取出来,注册OP_WRITE事件,将数据返回给客户端。
转载自 并发编程网 - ifeve.com 相关资源:敏捷开发V1.0.pptx