flume -- fileChannel简要分析其过程

    xiaoxiao2021-04-15  207

    flume之event写入FileChannel


    doPut(event)-->获取共享锁后[log.lockShared();]-->FlumeEventPointer ptr = log.put(transactionID, event);


    此处的log.put即将transactionID及event进行后续操作,如下代码所示:

    FlumeEventPointer put(long transactionID, Event event) throws IOException { FlumeEvent flumeEvent = new FlumeEvent( event.getHeaders(), event.getBody());//将event封装成FlumeEvent对象 Put put = new Put(transactionID, WriteOrderOracle.next(), flumeEvent);//将trId/时间戳/event封装成Put对象 ByteBuffer buffer = TransactionEventRecord.toByteBuffer(put); int logFileIndex = nextLogWriter(transactionID); long usableSpace = logFiles.get(logFileIndex).getUsableSpace(); long requiredSpace = minimumRequiredSpace + buffer.limit(); if(usableSpace <= requiredSpace) { throw new IOException("Usable space exhausted, only " + usableSpace + " bytes remaining, required " + requiredSpace + " bytes"); } boolean error = true; try { try { FlumeEventPointer ptr = logFiles.get(logFileIndex).put(buffer); error = false; return ptr; } catch (LogFileRetryableIOException e) { if(!open) { throw e; } roll(logFileIndex, buffer); FlumeEventPointer ptr = logFiles.get(logFileIndex).put(buffer); error = false; return ptr; } } finally { if(error && open) { roll(logFileIndex); } } }

    上述FlumeEventPointer ptr = logFiles.get(logFileIndex).put(buffer);是继续写的,进入:

    synchronized FlumeEventPointer put(ByteBuffer buffer) throws IOException { if(encryptor != null) { buffer = ByteBuffer.wrap(encryptor.encrypt(buffer.array())); } Pair<Integer, Integer> pair = write(buffer); return new FlumeEventPointer(pair.getLeft(), pair.getRight()); }

    此处的wirte:

    private Pair<Integer, Integer> write(ByteBuffer buffer) throws IOException { if(!isOpen()) { throw new LogFileRetryableIOException("File closed " + file); } long length = position();//正在写入的posititon/offset long expectedLength = length + (long) buffer.limit(); if(expectedLength > maxFileSize) { throw new LogFileRetryableIOException(expectedLength + " > " + maxFileSize); } int offset = (int)length; Preconditions.checkState(offset >= 0, String.valueOf(offset)); // OP_RECORD + size + buffer int recordLength = 1 + (int)Serialization.SIZE_OF_INT + buffer.limit(); usableSpace.decrement(recordLength); preallocate(recordLength); ByteBuffer toWrite = ByteBuffer.allocate(recordLength); toWrite.put(OP_RECORD); writeDelimitedBuffer(toWrite, buffer); toWrite.position(0); int wrote = getFileChannel().write(toWrite);//将event写入缓存 Preconditions.checkState(wrote == toWrite.limit()); return Pair.of(getLogFileID(), offset);//返回包含fildId和offset的对象 }

    此处的write即使将包装buffer的event写入到内存中,并且返回fileId及上次的offset,

    FlumeEventPointer ptr = log.put(transactionID, event); Preconditions.checkState(putList.offer(ptr), "putList offer failed " + channelNameDescriptor);

    只要返回FlumeEventPointer即说明该event已成功写入磁盘缓冲区(还未强制刷盘)。

    接下来FileBackedTransaction.doPut需要将得到的FlumeEventPointer和事务ID临时存放到内存队列queue的inflightPuts属性中 queue.addWithoutCommit(ptr, transactionID); -->nflightPuts.addEvent(transactionID, e.toLong()); --> inflightEvents.put(transactionID, pointer); inflightFileIDs.put(transactionID, FlumeEventPointer.fromLong(pointer).getFileID()); syncRequired = true;

    ,此处之所以说临时存放,是因为事务还未commit,如果直接将FlumeEventPointer放入queue的已提交数据容器backingStore中,那么就提前暴露给Sink了,这样就不方便区分哪些Event还未被真正提交,不方便后期进行回滚操作。

    queue是FlumeEventQueue的一个实例,对于FileChannel来说,采用的是本地文件作为存储介质,服务器的宕机重启不会带来数据丢失(没有刷盘的数据或磁盘损坏除外),同时为了高效索引文件数据,我们需要在内存中保存写入到文件的Event的位置信息,前面说到Event的位置信息可由FlumeEventPointer对象来存储,FlumeEventPointer是由FlumeEventQueue来管理的,FlumeEventQueue有两个InflightEventWrapper类型的属性:inflightPuts和inflightTakes,InflightEventWrapper是FlumeEventQueue的内部类,专门用来存储未提交Event(位置指针),这种未提交的Event数据称为飞行中的Event(in flight events)。因为这是向Channel的写操作,所以调用InflightEventWrapper的addEvent(Long transactionID,Long pointer)方法将未提交的数据保存在inflightPuts中(inflightEvents.put(transactionID, pointer);),注意,这里有个小细节,InflightEventWrapper的addEvent的第二个入参是Long类型的,而我们要保存的是FlumeEventPointer类型,所以,在FlumeEventPointer.toLong方法中,会将FlumeEventPointer的两个int属性合成一个long属性,以方便存储,方法如下:

    public long toLong() { long result = fileID; result = (long)fileID << 32; result += (long)offset; return result; }

    这一步做完之后,写入Channel的操作就完成了,立马释放共享锁(log.unlockShared())和信号量(queueRemaining.release())

    然后进行事务的commit,当一个事务提交后,Channel需要告知Source Event已经成功保存了,而Source需要通知写入源已经写入成功,即成功ACK.那么。File Channel的事务提交过程由FileBackedTransaction.doCommit方法来完成,对于Source往Channel的写操作(putList的size大于0),如代码: int puts = putList.size(); int takes = takeList.size(); if(puts > 0) { Preconditions.checkState(takes == 0, "nonzero puts and takes " + channelNameDescriptor); log.lockShared(); try { log.commitPut(transactionID); channelCounter.addToEventPutSuccessCount(puts); ...

    还是会先通过Log对象拿到检查点的共享锁(log.lockShared()),拿到共享锁后,将调用Log对象的commit方法,commit方法先将事务ID、新生成的写顺序ID和操作类型(这里是put)来构建一个Commit对象,该对象和上面步骤4提到过的Put对象类似,然后再将该Commit写入数据文件,写入目录下的哪个文件,和写Put对象是一致的,由相同的事务ID来决定,这将意味着在一次事务操作中,一个Commit对象和对应的Put(如果是批次提交,则有多少个Event就有多少个Put)会被写入同一个数据目录下。当然,写入Commit对象到文件后并不需要保留其位置指针。

    做完这一步,将putList中的Event文件位置指针FlumeEventPointer对象依次移除并放入内存队列queue(FlumeEventQueue)的队尾部

    while(!putList.isEmpty()) { if(!queue.addTail(putList.removeFirst())) { StringBuilder msg = new StringBuilder(); msg.append("Queue add failed, this shouldn't be able to "); msg.append("happen. A portion of the transaction has been "); msg.append("added to the queue but the remaining portion "); msg.append("cannot be added. Those messages will be consumed "); msg.append("despite this transaction failing. Please report."); msg.append(channelNameDescriptor); LOG.error(msg.toString()); Preconditions.checkState(false, msg.toString()); } }

    这样Sink才有机会从内存队列queue中索引要取的Event在数据目录中的位置,同时也保证了这批次的Event谁第一个写入的Event也将第一个被放入内存队列也将会第一个被Sink消费。当FlumeEventPointer被全部写入内存队列queue后,则需要将保存在queue.inflightPuts中的该事务ID移除(queue.completeTransaction(transactionID);),因为该事务不再处于“飞行中(in flight events)”了,于是,一个事务真正的被commit了。

    queue.completeTransaction(transactionID); } } catch (IOException e) { throw new ChannelException("Commit failed due to IO error " + channelNameDescriptor, e); } finally { log.unlockShared(); } } else if (takes > 0) { log.lockShared(); try { log.commitTake(transactionID); queue.completeTransaction(transactionID); channelCounter.addToEventTakeSuccessCount(takes); } catch (IOException e) { throw new ChannelException("Commit failed due to IO error " + channelNameDescriptor, e); } finally { log.unlockShared(); } queueRemaining.release(takes); } putList.clear(); takeList.clear(); channelCounter.setChannelSize(queue.getSize()); }

    在上面的步骤中,Event写入了本地磁盘文件,Event的文件指针也被写入了内存队列,这样当节点崩溃Flume被重启时,保存的内存队列FlumeEventQueue将直接消失,从而导致文件指针丢失,sink端将无法定位那些已经准备好的可以被读取的Event在文件中的位置了,所以,在flume中有一个线程经常对内存队列做“备份”,没错,Flume也是会对内存队列做文件备份的,于是,检查点(checkpoint)的概念出现了!还记得Flume官方文档中File Channel中checkpointDir属性么,它就是设置检查点文件存储的目录的。那什么时候会将内存队列备份到检查点文件呢?对于写Channel来说,就是步骤5中queue.addTail操作,这也是为什么步骤5我们没仔细分析该步骤的原因。其实,写检查点文件也是比较复杂的一个过程,因为它要保证写时效率,也要保证恢复时能将数据重新加载到内存队列中。

    addTail()方法

    在介绍该方法之前,我们来看看内存队列FlumeEventQueue中最重要的成员属性:backingStore,它是EventQueueBackingStore接口的实现,实现类为EventQueueBackingStoreFile。backingStore主要有两个作用:

    (1) 保存已经提交了的Event位置指针,放在一个Map结构的overwriteMap属性中;

    (2) 是将已提交但还未来得及被消费的Event位置指针数据写入检查点文件,这是通过将检查点文件映射到一块虚拟内存中来完成的,这个虚拟内存结构就是backingStore的mappedBuffer属性,它是MappedByteBuffer类型。overwriteMap和mappedBuffer中的数据是不相交的,因为有个后台调度任务(默认30秒执行一次,最短能每1秒钟执行一次)会定时将overwriteMap的数据写入mappedbuffer,每写一个Event指针就把该Event指针从overwriteMap中移除,所以overwriteMap + mappedbuffer几乎就可以说是内存队列中未被消费的全量Event指针数据。

    logFiles = new AtomicReferenceArray<LogFile.Writer>(this.logDirs.length); workerExecutor = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("Log-BackgroundWorker-" + name) .build()); workerExecutor.scheduleWithFixedDelay(new BackgroundWorker(this), this.checkpointInterval, this.checkpointInterval, TimeUnit.MILLISECONDS);

    具体线程:

    static class BackgroundWorker implements Runnable { private static final Logger LOG = LoggerFactory .getLogger(BackgroundWorker.class); private final Log log; public BackgroundWorker(Log log) { this.log = log; } @Override public void run() { try { if (log.open) { log.writeCheckpoint(); } } catch (IOException e) { LOG.error("Error doing checkpoint", e); } catch (Throwable e) { LOG.error("General error in checkpoint worker", e); } } }
    log.writeCheckpoint()
    private Boolean writeCheckpoint(Boolean force) throws Exception { boolean checkpointCompleted = false; long usableSpace = checkpointDir.getUsableSpace(); if(usableSpace <= minimumRequiredSpace) { throw new IOException("Usable space exhausted, only " + usableSpace + " bytes remaining, required " + minimumRequiredSpace + " bytes"); } lockExclusive(); SortedSet<Integer> logFileRefCountsAll = null, logFileRefCountsActive = null; try { if (queue.checkpoint(force)) { <!-- synchronized boolean checkpoint(boolean force) throws Exception {--> <!-- if (!backingStore.syncRequired()--> <!-- && !inflightTakes.syncRequired()--> <!-- && !force) { //No need to check inflight puts, since that would--> <!-- //cause elements.syncRequired() to return true.--> <!-- LOG.debug("Checkpoint not required");--> <!-- return false;--> <!-- }--> <!-- backingStore.beginCheckpoint();//检查是否checkpoint正在进行;同时进行标记checkpoint开始,并同步MMAP file;--> <!-- inflightPuts.serializeAndWrite();--> <!-- inflightTakes.serializeAndWrite();//将inflightputs/takes序列化并写到相应文件--> <!-- backingStore.checkpoint();--> <!-- return true;--> <!--}`--> long logWriteOrderID = queue.getLogWriteOrderID(); logFileRefCountsAll = queue.getFileIDs(); logFileRefCountsActive = new TreeSet<Integer>(logFileRefCountsAll); int numFiles = logFiles.length(); for (int i = 0; i < numFiles; i++) { LogFile.Writer logWriter = logFiles.get(i); int logFileID = logWriter.getLogFileID(); File logFile = logWriter.getFile(); LogFile.MetaDataWriter writer = LogFileFactory.getMetaDataWriter(logFile, logFileID); try { writer.markCheckpoint(logWriter.position(), logWriteOrderID); } finally { writer.close(); } logFileRefCountsAll.remove(logFileID); LOGGER.info("Updated checkpoint for file: " + logFile + " position: " + logWriter.position() + " logWriteOrderID: " + logWriteOrderID); } // Update any inactive data files as well Iterator<Integer> idIterator = logFileRefCountsAll.iterator(); while (idIterator.hasNext()) { int id = idIterator.next(); LogFile.RandomReader reader = idLogFileMap.remove(id); File file = reader.getFile(); reader.close(); LogFile.MetaDataWriter writer = LogFileFactory.getMetaDataWriter(file, id); try { writer.markCheckpoint(logWriteOrderID); } finally { reader = LogFileFactory.getRandomReader(file, encryptionKeyProvider, fsyncPerTransaction); idLogFileMap.put(id, reader); writer.close(); } LOGGER.debug("Updated checkpoint for file: " + file + "logWriteOrderID " + logWriteOrderID); idIterator.remove(); } Preconditions.checkState(logFileRefCountsAll.size() == 0, "Could not update all data file timestamps: " + logFileRefCountsAll); //Add files from all log directories for (int index = 0; index < logDirs.length; index++) { logFileRefCountsActive.add(logFiles.get(index).getLogFileID()); } checkpointCompleted = true; } } finally { unlockExclusive(); } //Do the deletes outside the checkpointWriterLock //Delete logic is expensive. if (open && checkpointCompleted) { removeOldLogs(logFileRefCountsActive); } //Since the exception is not caught, this will not be returned if //an exception is thrown from the try. return true; }

    如果Sink想从File Channel取出一个Event来消费,FlumeEventQueue会先尝试从overwriteMap中取,如果找不到,就从mappedBuffer中取。当然,由于mappedBuffer是ByteBuffer,所以为了操作方便,在mappedBuffer之上建立了一个LongBuffer的视图,即backingStore的elementsBuffer属性(elementsBuffer = mappedBuffer.asLongBuffer();)。好了,回过头来,我们继续说addTail方法的实现,它主要分为如下几个步骤:

    1.先检查backingStore的容量(backingStore的容量和File Channel的容量一致,这也保证了及时整个File Channel中的数据没被消费,内存队列也能完整备份到检查点文件中),如果满了则告知失败,返回false,这时候会打印错误日志,但不会抛异常,也不会影响整个事务的提交(为什么只是打印错误日志呢?因为Flume的作者认为这中情况不太可能发生,因为事务队列中保留了部分空余空间)。

    2.还是老规矩,将Event指针对象FlumeEventPointer整合成一个long类型。

    3.backingStore中有个属性logFileIDReferenceCounts,它是个Map结构,key为数据文件logFile的文件ID,value为该文件中还未被消费的Event指针计数器。于是需要将该Event指针所写的数据文件ID对应的计数器+1?大家有没有想过这是为什么?对,如果当某个logFile文件的计数器为0,表明该文件的Event全被Sink消费了,也就是说Flume可以在需要时删除该logFile及其相关的检查点或备份文件了。

    4.backingStore有个int型的queueSize属性,用来记录放入overwriteMap的Event指针数,于是,这一步是将queueSize和Event指针放入overwriteMap中。

    5.queueSize+1.

    6.backingStore还有个Set类型的queueSet属性,用来存放所有的Event指针的数据,并映射到文件,queueSet中包含了overwriteMap和mappedBuffer中所有的Event指针。所以这一步就是将该Event指针放入queueSet中。

    经过这6步,内存队列FlumeEventQueue的addTail操作就完成了。

    flume之replay

    当FlumeChannel启动时,或者故障恢复时,会经历一次重播(replay)过程,重播的目的就是还原上一次的“现场”,当然,最主要的就是恢复FlumeEventQueue中的内存队列相关数据。重播的主要实现是有Log类来做的,Log类的replay实现了整个重播过程,简单来说,重播过程分为如下几个步骤:

    步骤1:获取检查点文件的独占锁(checkpointWriterLock.lock();)。

    步骤2:将数据文件ID的初始值设置成0(nextFileID.set(0);)。

    步骤3:便利所有数据文件目录(dataDirs),将所有数据文件放入文件列表dataFiles中,将nextFileID设置成当前存在的数据文件ID的值,便于后期调用能生成正确的数据文件ID,将数据文件ID和数据文件的随机读取器之间的映射关系放入idLogFileMap之中。

    步骤4:对数据文件列表dataFiles安装数据文件ID进行升序排序。

    步骤5:如果use-fast-replay设置成ture(默认值为false)且检查点文件(checkpoint)不存在,则进行快速全播(fast full replay)流程,见步骤7。

    步骤6:如果use-fast-replay设置成false,则通过检查点进行重播。见步骤8.

    步骤7:类似于步骤8,只是不通过检查点的中最大的写顺序ID开始重播而已(因为当时还没有检查点)。

    步骤8:记住检查点的当前写顺序ID(从checkpoint.meta中获得),从inflightputs文件反序列化得到未提交的put的事务ID和Event指针(FlumeEventPointer)的映射inflightPuts,从inflighttakes文件中反序列化得到未提交的take的事务ID和Event指针的映射inflightTakes。

    步骤9:遍历数据文件(通过前面得到的数据文件列表dataFiles),找出有包含比检查点的写数据ID还大的写数据ID的数据文件,因为这些数据文件才对恢复File Channel内存队列有用,别忘了File Channel内存队列保存的是所有未被消费的FlumeEventPoint(这当然也需要把那些没完成事务最后一步commit动作的事务给继续完成),并把这些数据文件的该写顺序ID后面的下一个行记录对象(即LogRecord,LogRecord中不仅包含数据记录,还包含记录类型和操作类型,记录类型有put/take/commit/rollback,操作类型有put/take)保存到队列logRecordBuffer中。

    步骤10:通过logRecordBuffer可以将所有需要读取的数据文件的剩余部分遍历一遍,这样,我们可以得到一个只包含put和take操作但却未commit和rollback的事务ID对应FlumeEventPoint的Map,即transactionMap,同时,还会更新步骤8中提到的inflightTakes,移除掉已经成功commit的take事务。如果发现有已经提交的事务,则需要进行提交处理,如果是commit的put事务,则将其FlumeEventPoint添加到内存队列队尾,如果是commit的take事务,则从内存队列中移除。 当然,还可以得到当前最大的事务ID(transactionIDSeed)和最大的写顺序ID(writeOrderIDSeed),这个是为了让后面生成的事务ID和写顺序ID可用(TransactionIDOracle.setSeed(transactionIDSeed);WriteOrderOracle.setSeed(writeOrderIDSeed);)。这一步做完,内存队列中已经包含了所有已经完成事务commit但并没有被Sink消费的所有FlumeEventPoint了。

    步骤11:将所有没有commit的take事务所包含的数据(inflightTakes中的数据)重新插入到内存队列的头部。

    从以上步骤可以看出,Flume中有两种重播方式,一种是不通过检查点(此时必须检查点不存在且配置的use-fast-replay为true)的“快速全播”,一种是普通的通过检查点重播,这也是默认的重播方式。重播的目的就是为了通过磁盘文件来恢复File Channel的内存队列,使File Channel能继续运行,重播需要的时间和当时内存队列中未被消费的FlumeEventPoint成正比

    关于步骤1,自不必多说,因为重播过程中,是不能接收消息的,就像JVM GC真正执行时需要StopWorld一样。这里再次阐述一次,为了保证每个数据目录(dataDir)下数据文件的合理大小,当数据超过一个数据文件的最大容量时,Flume会自动在当前目录新建一个数据文件,为了区分同一个数据目录下的数据文件,Flume采用文件名加数字后缀的形式(比如log-1、log-2),其中的数字后缀就是数据文件ID,它由Log实例中的nextFileID属性来维护,它是原子整形,由于在Flume Agent实例中,一个File Channel会对应一个Log实例,所以数据文件ID是唯一的,即使你配置了多个数据目录。每个数据文件都有一个对应的元数据文件(MetaDataFile),它和数据文件在同一目录,命名则是在数据文件后面加上.meta,比如log-1对应的元数据文件是log-1.meta,其实就是将Checkpoint对象通过谷歌的Protos协议序列化到元数据文件中,Checkpoint存储了对应数据文件的诸多重要信息,比如版本、写顺序ID(logWriteOrderID)、队列大小(queueSize)和队列头索引(queueHead)等。元数据文件主要用于快速将数据文件的信息载入到内存中。注意,检查点也有自己的元数据文件(checkpoint.meta)。其实,步骤1-4理解起来都不难,我们知道,检查点目录(checkpointDir)下一共有四个文件(除去锁文件和检查点的元数据文件):检查点文件(checkpoint)、提取未提交文件(inflighttakes)、写入未提交文件(inflightputs)和队列集合文件(queueset,更准确来说这是目录),内存队列正式在这四个文件的相互补充下,得到完整的恢复。

    其中上述的步骤10是关键,但也容易理解,比如最常用的通过检查点来恢复,检查点的元数据文件中保存的最大写顺序ID,说明在这个写顺序ID之前的数据要不就已经在检查点文件中了要不就已经被Sink消费掉了,所以通过检查点文件恢复的内存队列,还需要补充两种数据:第一种数据是已经commit的数据但还未来得及对检查点数据刷盘(默认每30秒将内存队列写入检查点文件,可通过checkpointInterval来设置)。第二种数据是飞行中的数据,即还未来得及commit的数据。步骤10中提到的transactionMap的类型是MultiValueMap,所以可以从一个事务ID中找到和其相关的所有操作记录,从上述的步骤10和步骤11可以看出,对于第一种数据,可以通过遍历包含了检查点最大写顺序ID之后数据的数据文件来将其加载到内存队列中,但这时候内存队列中的数据是有冗余的,包含了已经被消费的commit事务的数据,所以这时候未提交数据凭证文件(inflightputs文件和inflighttakes文件)中的数据就起到作用了,将检查点写顺序ID后的所有事务数据(transactionMap)和通过inflightPuts映射、inflightTakes经过运算得到的数据(put未commit或者put已经commit但take没commit的数据)取“交集”得到的FlumeEventPoint集合,就是内存队列需要补充的数据了。


    最新回复(0)