BufferPool的创建和释放都是比较消耗资源的,Kafka客户端使用BufferPool来实现ByteBuffer的复用。BufferPool主要包括两个部分,已用空间和未用的空间,总大小由buffer.memory配置。 未用的空间又分为已经申请但是没有使用的(free队列),和没有申请的空间(availableMemory)。它主要字段为:
public final class BufferPool { // 整个Pooll的大小 private final long totalMemory; // 指定ByteBuffer的大小 private final int poolableSize; // 多线程并发分配和回收ByteBuffer,用锁控制并发。 private final ReentrantLock lock; // 缓存了指定大小的ByteBuffer对象(poolableSize指定) private final Deque<ByteBuffer> free; // 记录因申请不到足够空间而阻塞的线程,此队列中实际记录的是阻塞线程对应的Condition对象。 private final Deque<Condition> waiters; // 可用的空间大小,totalMemory减去free列表中所有ButeBuffer的大小。 private long availableMemory; private final Metrics metrics; private final Time time; private final Sensor waitTime; }每个BufferPool对象值针对指定大小的ByteBuffer进行管理(poolableSize指定)。一般情况下,调整MemoryRecord的大小让其缓存多条消息(batch.size)。当一条消息大于MemoryRecord的大小时,就不会复用BufferPool中的ByteBuffer,而是额外分配新的ByteBuffer。用完后由GC直接回收,如果经常出现这种情况,就需要调整batch.size。
BufferPool最重要的函数是allocate申请空间。分配内存的由RecordAccumulator调用,大小制定为Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value)),即batchs.size和实际消息大小中较大的一个,batchs.size为poolableSize。
public final class BufferPool { public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException { // 消息大小大于totalMemory,直接抛异常 if (size > this.totalMemory) throw new IllegalArgumentException("Attempt to allocate " + size + " bytes, but there is a hard limit of " + this.totalMemory + " on memory allocations."); this.lock.lock(); try { // 如果请求的是poolableSize大小的ByteBuffer,且free中有空余的ByteBuffer。 if (size == poolableSize && !this.free.isEmpty()) return this.free.pollFirst(); // 计算出已经分配的free所有空间,加上可用空间的大小,大于size则可以进行分配 int freeListSize = this.free.size() * this.poolableSize; if (this.availableMemory + freeListSize >= size) { /* 需要availableMemory大于size才进行分配,否者一直循环等待free释放空间。 private void freeUp(int size) { while (!this.free.isEmpty() && this.availableMemory < size) this.availableMemory += this.free.pollLast().capacity(); } */ freeUp(size); // 可以分配 this.availableMemory -= size; lock.unlock(); // java.nio.ByteBuffer#allocate分配size大小的buffer return ByteBuffer.allocate(size); } else { // 没有足够的空间,只能阻塞直到已经分配的buffer释放了。 int accumulated = 0; ByteBuffer buffer = null; Condition moreMemory = this.lock.newCondition(); long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs); // 把Condition加入到waiters中。 this.waiters.addLast(moreMemory); // 循环等待,直到能分配空间 while (accumulated < size) { long startWaitNs = time.nanoseconds(); long timeNs; boolean waitingTimeElapsed; try { waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS); } catch (InterruptedException e) { this.waiters.remove(moreMemory); throw e; } finally { long endWaitNs = time.nanoseconds(); timeNs = Math.max(0L, endWaitNs - startWaitNs); this.waitTime.record(timeNs, time.milliseconds()); } if (waitingTimeElapsed) { this.waiters.remove(moreMemory); throw new TimeoutException("Failed to allocate memory within the configured max blocking time " + maxTimeToBlockMs + " ms."); } remainingTimeToBlockNs -= timeNs; // 如果当前累计已经分配内存是0,即还没有分配,且请求是poolableSize大小的ByteBuffer,且free中有,则直接分配。 if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) { buffer = this.free.pollFirst(); accumulated = size; } else { // 否者等待size - accumulated的内存释放。 freeUp(size - accumulated); int got = (int) Math.min(size - accumulated, this.availableMemory); this.availableMemory -= got; accumulated += got; } } // 已经成功分配空间,则移除Condition Condition removed = this.waiters.removeFirst(); if (removed != moreMemory) throw new IllegalStateException("Wrong condition: this shouldn't happen."); // 如果还有其他空间,则唤醒下一个线程。 if (this.availableMemory > 0 || !this.free.isEmpty()) { if (!this.waiters.isEmpty()) this.waiters.peekFirst().signal(); } // unlock and return the buffer lock.unlock(); if (buffer == null) return ByteBuffer.allocate(size); else return buffer; } } finally { if (lock.isHeldByCurrentThread()) lock.unlock(); } } }释放空间:
public final class BufferPool { public void deallocate(ByteBuffer buffer, int size) { lock.lock(); try { // 如果释放的ByteBuffer大小是poolableSize,放入free中进行管理。 if (size == this.poolableSize && size == buffer.capacity()) { buffer.clear(); this.free.add(buffer); } else { // 释放的大小不是poolableSize,则直接用GC回收。 this.availableMemory += size; } // 唤醒一个空间不足阻塞的线程。 Condition moreMem = this.waiters.peekFirst(); if (moreMem != null) moreMem.signal(); } finally { lock.unlock(); } } }1. BufferPool是线程安全的,用一个ReentrantLock来保证。 2. BufferPool只针对特定大小的ByteBuffer进行管理,对于其它大小的并不会缓存进来。因此如果超大消息比较多(大于poolableSize),就不会很好的利用内存池,频繁的申请回收内存效率会降低,并可能带来Full GC或者Out Of Memory Error,这个时候就要调整好batch.size的配置了。