kafka在发消息的时候回把消息放在缓存中,由sender线程再发送出去,这个缓存就是RecordAccumulator,RecordAccumulator封装了RecordBatch储存消息。主要字段为:
public final class RecordAccumulator { private volatile boolean closed; private final AtomicInteger flushesInProgress; private final AtomicInteger appendsInProgress; // 指定每个recordBatch的ByteBuffer大小。 private final int batchSize; private final CompressionType compression; private final long lingerMs; private final long retryBackoffMs; // BufferPool private final BufferPool free; private final Time time; // topicParition和recordBatch的映射关系。一个topic的分区对应一个RecordBatch队列。 private final ConcurrentMap<TopicPartition, Deque<RecordBatch>> batches; // 未发送完成的RecrodBatch集合,通过Set<RecordBatch>实现 private final IncompleteRecordBatches incomplete; // The following variables are only accessed by the sender thread, so we don't need to protect them. // TopicPartition集合 private final Set<TopicPartition> muted; private int drainIndex; }当Producer调用send方法时,结果是把消息追加到RecordAccumulator中。
public final class RecordAccumulator { public RecordAppendResult append(TopicPartition tp, long timestamp, byte[] key, byte[] value, Callback callback, long maxTimeToBlock) throws InterruptedException { // 统计向RecordAccumulator中追加数据的线程数 appendsInProgress.incrementAndGet(); try { // 找到对应topicPartition对应的RecordBatch队列,没有则创建。 Deque<RecordBatch> dq = getOrCreateDeque(tp); // 加锁,向dq中追加消息,直接调用前面介绍的RecordAccumulator#tryAppend,成功则返回。 synchronized (dq) { if (closed) throw new IllegalStateException("Cannot send after the producer is closed."); RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq); if (appendResult != null) return appendResult; } // 追加失败,从BufferPool中申请新的内存。 int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value)); log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition()); ByteBuffer buffer = free.allocate(size, maxTimeToBlock); synchronized (dq) { // 查看producer是否已经close了 if (closed) throw new IllegalStateException("Cannot send after the producer is closed."); // 再次调用tryAppend尝试,RecordAccumulator有可能已经有新的空间空闲了。 RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq); if (appendResult != null) { // 追加成功,则放回,然后释放刚刚申请的buffer。 free.deallocate(buffer); return appendResult; } // 添加失败,就创建新的MemoryRecords,放到batchs中 MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize); RecordBatch batch = new RecordBatch(tp, records, time.milliseconds()); //追加信息。放到incomplete集合中,返回RecordAppendResult。 FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, callback, time.milliseconds())); dq.addLast(batch); incomplete.add(batch); return new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true); } } finally { appendsInProgress.decrementAndGet(); } } }在往RecordAccumulator追加信息后判断是否满足唤醒Sender线程的条件,调用ready方法。
public final class RecordAccumulator { public ReadyCheckResult ready(Cluster cluster, long nowMs) { // 记录可以向那些node发送信息 Set<Node> readyNodes = new HashSet<>(); // 记录下次调用ready方法的时间间隔 long nextReadyCheckDelayMs = Long.MAX_VALUE; // 是否有找不到leader的分区 boolean unknownLeadersExist = false; // exhausted记录是否有其他的线程再等待释放空间。后续作为是否唤醒sender线程的条件之一 boolean exhausted = this.free.queued() > 0; // 便利batchs集合,对每个分区的Leader所在的node进行判断。 for (Map.Entry<TopicPartition, Deque<RecordBatch>> entry : this.batches.entrySet()) { TopicPartition part = entry.getKey(); Deque<RecordBatch> deque = entry.getValue(); Node leader = cluster.leaderFor(part); if (leader == null) { // leader找不到,不能发送消息,unknownLeadersExist为true,出发meatadata的更新。 unknownLeadersExist = true; } else if (!readyNodes.contains(leader) && !muted.contains(part)) { synchronized (deque) { // 只取dequeue中第一个recordBatch RecordBatch batch = deque.peekFirst(); if (batch != null) { // 查看满足唤醒sender线程的条件 boolean backingOff = batch.attempts > 0 && batch.lastAttemptMs + retryBackoffMs > nowMs; long waitedTimeMs = nowMs - batch.lastAttemptMs; long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs; long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0); // deque中有多个recordBatch或者第一个recordBatch已经满了。 boolean full = deque.size() > 1 || batch.records.isFull(); // 超时 boolean expired = waitedTimeMs >= timeToWaitMs; boolean sendable = full || expired //是否有其他线程等待释放空间 || exhausted // Sender线程准备关闭, || closed // 是否有线程正在等待flush完成 || flushInProgress(); if (sendable && !backingOff) { // 把满足node放入readyNodes中 readyNodes.add(leader); } else { // 记录下次需要调用ready的时间间隔 nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs); } } } } } return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeadersExist); } }RecordAccumulator.drain方法由sende线程调用,根据上述node集合获取要发送的消息,构造Map<Integer(NodeId), List<RecordBatch>>集合。
public final class RecordAccumulator { public Map<Integer, List<RecordBatch>> drain(Cluster cluster, Set<Node> nodes, int maxSize, long now) { if (nodes.isEmpty()) return Collections.emptyMap(); Map<Integer, List<RecordBatch>> batches = new HashMap<>(); for (Node node : nodes) { int size = 0; // 获取当前node上的分区集合 List<PartitionInfo> parts = cluster.partitionsForNode(node.id()); List<RecordBatch> ready = new ArrayList<>(); // 使用drain方法批量导出ProducerBatch时,为了防止饥饿,使用drainIndex记录上次发送停止时的位置,下次继续从此位置开始发送。 int start = drainIndex = drainIndex % parts.size(); do { // 获取分区的详细情况。 PartitionInfo part = parts.get(drainIndex); TopicPartition tp = new TopicPartition(part.topic(), part.partition()); // Only proceed if the partition has no in-flight batches. if (!muted.contains(tp)) { // 获取对应的RecordBatch队列。 Deque<RecordBatch> deque = getDeque(new TopicPartition(part.topic(), part.partition())); if (deque != null) { synchronized (deque) { // 获取队列中的第一个RecordBatch RecordBatch first = deque.peekFirst(); if (first != null) { boolean backoff = first.attempts > 0 && first.lastAttemptMs + retryBackoffMs > now; // Only drain the batch if it is not during backoff period. if (!backoff) { if (size + first.records.sizeInBytes() > maxSize && !ready.isEmpty()) { // 数据量已满,循环结束 break; } else { // 从队列中获取一个RecordBatch,放入ready集合中。 // 每个TopicPartition只取一个RecordBatch。 RecordBatch batch = deque.pollFirst(); // 关闭输出流,把MemoryRecords设置为只读 batch.records.close(); size += batch.records.sizeInBytes(); ready.add(batch); batch.drainedMs = now; } } } } } } // 更新drainIndex this.drainIndex = (this.drainIndex + 1) % parts.size(); } while (start != drainIndex); // 记录node ID和RecordBatch的对应关系。 batches.put(node.id(), ready); } return batches; } }由上可知,只从每个队列中取出一个RecordBatch放到ready集合中,防止饥饿。