LogManager
LogManager会管理broker上所有的logs(在一个log目录下),一个topic的一个partition对应于一个log(一个log子目录) 首先loadLogs会加载每个partition所对应的log对象, 然后提供createLog,getLog,deleteLog之类的管理接口 并且会创建些后台线程来进行,cleanup,flush,checkpoint生成之类的工作
Log
Log只是对于LogSegments的封装,包含loadSegments,append(到active segment),read(需要定位到相应的segment)
LogSegment
Segment是个逻辑概念,为了防止log文件过大, 将log分成许多的LogSegments Segment又分为两部分,MessageSet文件和Index文件,分别命名为[base_offset].log和[base_offset].index base_offset就是该Segment的起始offset,比前一个segment里面的offset都要大
Segment提供对于MessageSet的读写接口 写,需要间隔的更新index文件,应该为了尽量减小index的size,所以只是当写入数据大于indexIntervalBytes时,才增加一条索引 读,由于user传入的是逻辑offest,需要先转化为物理地址才能从文件中读到数据,如何转化参考下面
同时index文件是可以根据MessageSet文件重新rebuild的
FileMessageSet
Segment中实际存放log message的文件,通过FileChannel可以读写文件
1: /** 2: * An on-disk message set. An optional start and end position can be applied to the message set 3: * which will allow slicing a subset of the file. 4: * @param file The file name for the underlying log data 5: * @param channel the underlying file channel used 6: * @param start A lower bound on the absolute position in the file from which the message set begins 7: * @param end The upper bound on the absolute position in the file at which the message set ends 8: * @param isSlice Should the start and end parameters be used for slicing? 9: */ 10: @nonthreadsafe 11: class FileMessageSet private[kafka](@volatile var file: File, 12: private[log] val channel: FileChannel, 13: private[log] val start: Int, 14: private[log] val end: Int, 15: isSlice: Boolean) extends MessageSet with Logging {...}OffsetIndex
Segment的index文件, 这是0.8后加上的,之前message直接使用物理offset标识 新版本中还是改成了使用逻辑offset,让物理地址对用户透明, 这样就需要一个index来匹配逻辑offset和物理地址 index考虑到效率,最好放在内存中,但是考虑到size问题, 所以使用MappedByteBuffer(参考,Java RandomAccessFile用法 ) 注释里面说, Index是sparse的,不保证每个message在index都有索引的entry Index由entry组成,每个entry为8-byte,逻辑offset4-byte,物理地址4-byte 并且逻辑offset是基于base offset的相对offset,否则无法保证只使用4-byte
1: /** 2: * An index that maps offsets to physical file locations for a particular log segment. This index may be sparse: 3: * that is it may not hold an entry for all messages in the log. 4: * 5: * The index is stored in a file that is pre-allocated to hold a fixed maximum number of 8-byte entries. 6: * 7: * The index supports lookups against a memory-map of this file. These lookups are done using a simple binary search variant 8: * to locate the offset/location pair for the greatest offset less than or equal to the target offset. 9: * 10: * Index files can be opened in two ways: either as an empty, mutable index that allows appends or 11: * an immutable read-only index file that has previously been populated. The makeReadOnly method will turn a mutable file into an 12: * immutable one and truncate off any extra bytes. This is done when the index file is rolled over. 13: * 14: * No attempt is made to checksum the contents of this file, in the event of a crash it is rebuilt. 15: * 16: * The file format is a series of entries. The physical format is a 4 byte "relative" offset and a 4 byte file location for the 17: * message with that offset. The offset stored is relative to the base offset of the index file. So, for example, 18: * if the base offset was 50, then the offset 55 would be stored as 5. Using relative offsets in this way let's us use 19: * only 4 bytes for the offset. 20: * 21: * The frequency of entries is up to the user of this class. 22: * 23: * All external APIs translate from relative offsets to full offsets, so users of this class do not interact with the internal 24: * storage format. 25: */ 26: class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSize: Int = -1) extends Logging { 27: private val lock = new ReentrantLock //操作index文件需要加锁 28: 29: /* initialize the memory mapping for this index */ 30: private var mmap: MappedByteBuffer = //使用MappedByteBuffer来操作index文件以应对大文件 31: { 32: val newlyCreated = file.createNewFile() 33: val raf = new RandomAccessFile(file, "rw") 34: val len = raf.length() 35: val idx = raf.getChannel.map(FileChannel.MapMode.READ_WRITE, 0, len) 36: } 37: 38: //通过byte偏移从buffer中读出某个entry的内容,offset和physical地址 39: /* return the nth offset relative to the base offset */ 40: private def relativeOffset(buffer: ByteBuffer, n: Int): Int = buffer.getInt(n * 8) 41: /* return the nth physical position */ 42: private def physical(buffer: ByteBuffer, n: Int): Int = buffer.getInt(n * 8 + 4) 43: 44: //通过二分查找找到targetOffset或最接近的offset(less than) 45: /** 46: * Find the largest offset less than or equal to the given targetOffset 47: * and return a pair holding this offset and it's corresponding physical file position. 48: * 49: * @param targetOffset The offset to look up. 50: * 51: * @return The offset found and the corresponding file position for this offset. 52: * If the target offset is smaller than the least entry in the index (or the index is empty), 53: * the pair (baseOffset, 0) is returned. 54: */ 55: def lookup(targetOffset: Long): OffsetPosition = {...} 56: 57: /** 58: * Get the nth offset mapping from the index 59: * @param n The entry number in the index 60: * @return The offset/position pair at that entry 61: */ 62: def entry(n: Int): OffsetPosition = { 63: maybeLock(lock) { 64: if(n >= entries) 65: throw new IllegalArgumentException("Attempt to fetch the %dth entry from an index of size %d.".format(n, entries)) 66: val idx = mmap.duplicate 67: OffsetPosition(relativeOffset(idx, n), physical(idx, n)) 68: } 69: } 70: 71: /** 72: * Append an entry for the given offset/location pair to the index. This entry must have a larger offset than all subsequent entries. 73: */ 74: def append(offset: Long, position: Int) { 75: inLock(lock) { 76: require(!isFull, "Attempt to append to a full index (size = " + size + ").") 77: if (size.get == 0 || offset > lastOffset) { 78: debug("Adding index entry %d => %d to %s.".format(offset, position, file.getName)) 79: this.mmap.putInt((offset - baseOffset).toInt) 80: this.mmap.putInt(position) 81: this.size.incrementAndGet() 82: this.lastOffset = offset 83: require(entries * 8 == mmap.position, entries + " entries but file position in index is " + mmap.position + ".") 84: } else { 85: throw new InvalidOffsetException("Attempt to append an offset (%d) to position %d no larger than the last offset appended (%d) to %s." 86: .format(offset, entries, lastOffset, file.getAbsolutePath)) 87: } 88: } 89: }具体看看如何从逻辑offset转化为物理地址的?
0.8中增加了逻辑offset,那么就需要做逻辑offset和物理地址间的转化 简单的方法,直接用hashmap,cache所有offset,问题就是这样空间耗费比较大 所以kafka的方式,是分段索引,用offset通过二分查找中index中找出段的起始地址,然后再去file里面遍历找出精确的地址, 时间换空间的设计
1. LogSegment.translateOffset 首先是从index文件中找到近似的物理地址 前面说了,index中从效率考虑并不会为每个offset建立索引entry,只会分段建立offset索引, 所以从index中直接可以找到精确物理地址的概率不大,但是可以找到最接近的那个物理地址 如果你觉得index的粒度比较粗,可以直接给出开始查找的startingFilePosition 所以精确的物理地址需要到MessageSet文件里面去继续找
2. FileMessageSet.searchFor 在messageSet中,message的构成是,overhead(MessageSize+Offset)和message 而searchFor的逻辑是从startingPosition开始, 逐条遍历各个message,并从overhead中取出offset进行比较,直到找到target offset为止
本文章摘自博客园,原文发布日期:2014-02-18
相关资源:敏捷开发V1.0.pptx