StateBackend是state的后端,包括memory、fs、rocksDB 总体来讲,就是根据CheckpointingOptions.STATE_BACKEND创建对应的StateBackend
//StreamTask //用来创建operatord stateBackend和keyed stateBakend protected StateBackend stateBackend; stateBackend = createStateBackend(); //创建checkpoint存储引擎 checkpointStorage = stateBackend.createCheckpointStorage(getEnvironment().getJobID()); private StateBackend createStateBackend() throws Exception { //从配置中初始化 final StateBackend fromApplication = configuration.getStateBackend(getUserCodeClassLoader()); return StateBackendLoader.fromApplicationOrConfigOrDefault( fromApplication, getEnvironment().getTaskManagerInfo().getConfiguration(), getUserCodeClassLoader(), LOG); } public static StateBackend fromApplicationOrConfigOrDefault( @Nullable StateBackend fromApplication, Configuration config, ClassLoader classLoader, @Nullable Logger logger) throws IllegalConfigurationException, DynamicCodeLoadingException, IOException { final StateBackend backend; // (1) 从配置中初始化StateBackend if (fromApplication != null) { if (fromApplication instanceof ConfigurableStateBackend) { backend = ((ConfigurableStateBackend) fromApplication).configure(config, classLoader); } else { backend = fromApplication; } } else { // (2) 根据CheckpointingOptions.STATE_BACKEND创建对应的StateBackend final StateBackend fromConfig = loadStateBackendFromConfig(config, classLoader, logger); if (fromConfig != null) { backend = fromConfig; } else { // (3) 默认创建内存的backend backend = new MemoryStateBackendFactory().createFromConfig(config, classLoader); if (logger != null) { logger.info("No state backend has been configured, using default (Memory / JobManager) {}", backend); } } } //支持旧的方式 if (backend instanceof MemoryStateBackend) { final MemoryStateBackend memBackend = (MemoryStateBackend) backend; if (memBackend.getCheckpointPath() == null && HighAvailabilityMode.isHighAvailabilityModeActivated(config)) { final String haStoragePath = config.getString(HighAvailabilityOptions.HA_STORAGE_PATH); if (haStoragePath != null) { try { Path checkpointDirPath = new Path(haStoragePath, UUID.randomUUID().toString()); if (checkpointDirPath.toUri().getScheme() == null) { checkpointDirPath = checkpointDirPath.makeQualified(checkpointDirPath.getFileSystem()); } Configuration tempConfig = new Configuration(config); tempConfig.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDirPath.toString()); return memBackend.configure(tempConfig, classLoader); } catch (Exception ignored) {} } } } return backend; } //根据CheckpointingOptions.STATE_BACKEND创建对应的StateBackend public static StateBackend loadStateBackendFromConfig( Configuration config, ClassLoader classLoader, @Nullable Logger logger) throws IllegalConfigurationException, DynamicCodeLoadingException, IOException { final String backendName = config.getString(CheckpointingOptions.STATE_BACKEND); // by default the factory class is the backend name String factoryClassName = backendName; switch (backendName.toLowerCase()) { case MEMORY_STATE_BACKEND_NAME: MemoryStateBackend memBackend = new MemoryStateBackendFactory().createFromConfig(config, classLoader); return memBackend; case FS_STATE_BACKEND_NAME: FsStateBackend fsBackend = new FsStateBackendFactory().createFromConfig(config, classLoader); return fsBackend; case ROCKSDB_STATE_BACKEND_NAME: factoryClassName = "org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory"; default: StateBackendFactory<?> factory; try { @SuppressWarnings("rawtypes") Class<? extends StateBackendFactory> clazz = Class.forName(factoryClassName, false, classLoader) .asSubclass(StateBackendFactory.class); factory = clazz.newInstance(); } catch (ClassNotFoundException e) { //exception } catch (ClassCastException | InstantiationException | IllegalAccessException e) { //exception } return factory.createFromConfig(config, classLoader); } } //创建CheckpointStreamFactory private void checkpointState( CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics checkpointMetrics) throws Exception { CheckpointStreamFactory storage = checkpointStorage.resolveCheckpointStorageLocation( checkpointMetaData.getCheckpointId(), checkpointOptions.getTargetLocation()); CheckpointingOperation checkpointingOperation = new CheckpointingOperation( this, checkpointMetaData, checkpointOptions, storage, checkpointMetrics); checkpointingOperation.executeCheckpointing(); } public CheckpointStreamFactory resolveCheckpointStorageLocation( long checkpointId, CheckpointStorageLocationReference reference) throws IOException { //checkpointDir file:/H:/flinkTest/checkpoint/ee510cc375b56bea45df694a1bd56f6f/chk-1 if (reference.isDefaultReference()) { // default reference, construct the default location for that particular checkpoint final Path checkpointDir = createCheckpointDirectory(checkpointsDirectory, checkpointId); return new FsCheckpointStorageLocation( fileSystem, checkpointDir, sharedStateDirectory, taskOwnedStateDirectory, reference, fileSizeThreshold); } else { // location encoded in the reference final Path path = decodePathFromReference(reference); return new FsCheckpointStorageLocation( path.getFileSystem(), path, path, path, reference, fileSizeThreshold); } }checkpointOptions的初始化
//CheckpointCoordinator final CheckpointStorageLocation checkpointStorageLocation; checkpointStorageLocation = props.isSavepoint() ? checkpointStorage.initializeLocationForSavepoint(checkpointID, externalSavepointLocation) : checkpointStorage.initializeLocationForCheckpoint(checkpointID); //FsCheckpointStorage public CheckpointStorageLocation initializeLocationForCheckpoint(long checkpointId) throws IOException { checkArgument(checkpointId >= 0); final Path checkpointDir = createCheckpointDirectory(checkpointsDirectory, checkpointId); // 创建checkpointDir fileSystem.mkdirs(checkpointDir); return new FsCheckpointStorageLocation( fileSystem, checkpointDir, sharedStateDirectory, taskOwnedStateDirectory, CheckpointStorageLocationReference.getDefault(), fileSizeThreshold); }FlinkKafkaConsumerBase是kafka source的基础类,实现了CheckpointListener和CheckpointedFunction这两个接口,通过initializeState来恢复状态,snapshotState来存储状态,notifyCheckpointComplete来完成快照后的回调。这里的状态是operator state,默认的statestore是DefaultOperatorStateBackend。 1.initializeState方法中,会初始化unionOffsetStates 2.首先从registeredOperatorStates中按名字获取partitionableListState,如果为空,则构造一个新的partitionableListState
//FlinkKafkaConsumerBase //state名称 private static final String OFFSETS_STATE_NAME = "topic-partition-offset-states"; //状态:partition和offset的映射 private transient ListState<Tuple2<KafkaTopicPartition, Long>> unionOffsetStates; public final void initializeState(FunctionInitializationContext context) throws Exception { OperatorStateStore stateStore = context.getOperatorStateStore(); //... this.unionOffsetStates = stateStore.getUnionListState(new ListStateDescriptor<>( OFFSETS_STATE_NAME, TypeInformation.of(new TypeHint<Tuple2<KafkaTopicPartition, Long>>() {}))); //... } //DefaultOperatorStateBackend public <S> ListState<S> getUnionListState(ListStateDescriptor<S> stateDescriptor) throws Exception { return getListState(stateDescriptor, OperatorStateHandle.Mode.UNION); } private <S> ListState<S> getListState( ListStateDescriptor<S> stateDescriptor, OperatorStateHandle.Mode mode) throws StateMigrationException { //略 PartitionableListState<S> partitionableListState = (PartitionableListState<S>) registeredOperatorStates.get(name); if (null == partitionableListState) { //构造partitionableListState partitionableListState = new PartitionableListState<>( new RegisteredOperatorStateBackendMetaInfo<>( name, partitionStateSerializer, mode)); //插入registeredOperatorStates registeredOperatorStates.put(name, partitionableListState); } else { // has restored state; check compatibility of new state access //略 } accessedStatesByName.put(name, partitionableListState); return partitionableListState; }3.执行snapshotState,unionOffsetStates中加入当前检查点的状态,于是registeredOperatorStates也保存了此状态
//FlinkKafkaConsumerBase public final void snapshotState(FunctionSnapshotContext context) throws Exception { //略 if (fetcher == null) { //略 } else { HashMap<KafkaTopicPartition, Long> currentOffsets = fetcher.snapshotCurrentState(); if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) { pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets); } for (Map.Entry<KafkaTopicPartition, Long> kafkaTopicPartitionLongEntry : currentOffsets.entrySet()) { unionOffsetStates.add( Tuple2.of(kafkaTopicPartitionLongEntry.getKey(), kafkaTopicPartitionLongEntry.getValue())); } } //略 } } } //AbstractStreamOperator public final OperatorSnapshotFutures snapshotState(long checkpointId, long timestamp, CheckpointOptions checkpointOptions, CheckpointStreamFactory factory) throws Exception { //略 if (null != operatorStateBackend) { snapshotInProgress.setOperatorStateManagedFuture( operatorStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions)); } //略 } //DefaultOperatorStateBackend public RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshot( long checkpointId, long timestamp, @Nonnull CheckpointStreamFactory streamFactory, @Nonnull CheckpointOptions checkpointOptions) throws Exception { long syncStartTime = System.currentTimeMillis(); RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshotRunner = snapshotStrategy.snapshot(checkpointId, timestamp, streamFactory, checkpointOptions); snapshotStrategy.logSyncCompleted(streamFactory, syncStartTime); return snapshotRunner; }以下涉及到几个类,来分析下其作用: CheckpointStateOutputStream:checkpoint输出流,将状态写入文件或内存 StateMetaInfoSnapshot:操作信息,包含名称、状态类型(operator、key-value、broadcast等)、状态类型 DataOutputViewStreamWrapper:包装了CheckpointStateOutputStream,只包含内存,感觉和状态数据视图有关 OperatorStateHandle:operator state操作句柄,可以操作状态 OperatorStateHandle.StateMetaInfo:OperatorStateHandle的元数据,包括offsets和distributionMode(包括SPLIT_DISTRIBUTE、UNION和BROADCAST)
//DefaultOperatorStateBackendSnapshotStrategy public RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshot( final long checkpointId, final long timestamp, @Nonnull final CheckpointStreamFactory streamFactory, @Nonnull final CheckpointOptions checkpointOptions) throws IOException { //如果registeredOperatorStates和registeredBroadcastStates都为空,则返回空 if (registeredOperatorStates.isEmpty() && registeredBroadcastStates.isEmpty()) { return DoneFuture.of(SnapshotResult.empty()); } //registeredOperatorStates的拷贝 final Map<String, PartitionableListState<?>> registeredOperatorStatesDeepCopies = new HashMap<>(registeredOperatorStates.size()); //registeredBroadcastStates的拷贝 final Map<String, BackendWritableBroadcastState<?, ?>> registeredBroadcastStatesDeepCopies = new HashMap<>(registeredBroadcastStates.size()); ClassLoader snapshotClassLoader = Thread.currentThread().getContextClassLoader(); Thread.currentThread().setContextClassLoader(userClassLoader); try { if (!registeredOperatorStates.isEmpty()) { for (Map.Entry<String, PartitionableListState<?>> entry : registeredOperatorStates.entrySet()) { PartitionableListState<?> listState = entry.getValue(); if (null != listState) { //复制一份listState listState = listState.deepCopy(); } //插入registeredOperatorStatesDeepCopies registeredOperatorStatesDeepCopies.put(entry.getKey(), listState); } } //同上复制一份broadcastState插入registeredBroadcastStates } finally { Thread.currentThread().setContextClassLoader(snapshotClassLoader); } AsyncSnapshotCallable<SnapshotResult<OperatorStateHandle>> snapshotCallable = new AsyncSnapshotCallable<SnapshotResult<OperatorStateHandle>>() { @Override protected SnapshotResult<OperatorStateHandle> callInternal() throws Exception { //创建一个checkpoint输出流 CheckpointStreamFactory.CheckpointStateOutputStream localOut = streamFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE); snapshotCloseableRegistry.registerCloseable(localOut); // 将operator state info插入operatorMetaInfoSnapshots List<StateMetaInfoSnapshot> operatorMetaInfoSnapshots = new ArrayList<>(registeredOperatorStatesDeepCopies.size()); for (Map.Entry<String, PartitionableListState<?>> entry : registeredOperatorStatesDeepCopies.entrySet()) { operatorMetaInfoSnapshots.add(entry.getValue().getStateMetaInfo().snapshot()); } // broadcast类似 DataOutputView dov = new DataOutputViewStreamWrapper(localOut); OperatorBackendSerializationProxy backendSerializationProxy = new OperatorBackendSerializationProxy(operatorMetaInfoSnapshots, broadcastMetaInfoSnapshots); //将operatorStateMetaInfoSnapshots和broadcastStateMetaInfoSnapshots写入checkpoint输出流 backendSerializationProxy.write(dov); //写入operator state和broadcast state的metadata,StateMetaInfo的key是快照名称如:topic-partition-offset-states,value是offset int initialMapCapacity = registeredOperatorStatesDeepCopies.size() + registeredBroadcastStatesDeepCopies.size(); final Map<String, OperatorStateHandle.StateMetaInfo> writtenStatesMetaData = new HashMap<>(initialMapCapacity); for (Map.Entry<String, PartitionableListState<?>> entry : registeredOperatorStatesDeepCopies.entrySet()) { PartitionableListState<?> value = entry.getValue(); long[] partitionOffsets = value.write(localOut); OperatorStateHandle.Mode mode = value.getStateMetaInfo().getAssignmentMode(); writtenStatesMetaData.put( entry.getKey(), new OperatorStateHandle.StateMetaInfo(partitionOffsets, mode)); } for (Map.Entry<String, BackendWritableBroadcastState<?, ?>> entry : registeredBroadcastStatesDeepCopies.entrySet()) { BackendWritableBroadcastState<?, ?> value = entry.getValue(); long[] partitionOffsets = {value.write(localOut)}; OperatorStateHandle.Mode mode = value.getStateMetaInfo().getAssignmentMode(); writtenStatesMetaData.put( entry.getKey(), new OperatorStateHandle.StateMetaInfo(partitionOffsets, mode)); } // 创建一个state handle,可以用来操作状态 OperatorStateHandle retValue = null; if (snapshotCloseableRegistry.unregisterCloseable(localOut)) { StreamStateHandle stateHandle = localOut.closeAndGetHandle(); if (stateHandle != null) { retValue = new OperatorStreamStateHandle(writtenStatesMetaData, stateHandle); } return SnapshotResult.of(retValue); } else { throw new IOException("Stream was already unregistered."); } } //略 }; final FutureTask<SnapshotResult<OperatorStateHandle>> task = snapshotCallable.toAsyncSnapshotFutureTask(closeStreamOnCancelRegistry); if (!asynchronousSnapshots) { task.run(); } return task; }RocksDBStateBackend是唯一支持增量存储状态的后端,state.backend.incremental参数决定检查点是增量还是全量,默认为全量。 RocksDB的架构如下: Rocksdb中引入了ColumnFamily(列族, CF)的概念,所谓列族也就是一系列kv组成的数据集。所有的读写操作都需要先指定列族。写操作先写WAL,再写memtable,memtable达到一定阈值后切换为Immutable Memtable,只能读不能写。后台Flush线程负责按照时间顺序将Immu Memtable刷盘,生成level0层的有序文件(SST)。后台合并线程负责将上层的SST合并生成下层的SST。Manifest负责记录系统某个时刻SST文件的视图,Current文件记录当前最新的Manifest文件名。 每个ColumnFamily有自己的Memtable, SST文件,所有ColumnFamily共享WAL、Current、Manifest文件。
//RocksDBKeyedStateBackendBuilder.initializeSavepointAndCheckpointStrategies if (enableIncrementalCheckpointing) { checkpointSnapshotStrategy = new RocksIncrementalSnapshotStrategy<>( db, rocksDBResourceGuard, keySerializerProvider.currentSchemaSerializer(), kvStateInformation, keyGroupRange, keyGroupPrefixBytes, localRecoveryConfig, cancelStreamRegistry, instanceBasePath, backendUID, materializedSstFiles, lastCompletedCheckpointId, numberOfTransferingThreads); } else { checkpointSnapshotStrategy = savepointSnapshotStrategy; } return new SnapshotStrategy<>(checkpointSnapshotStrategy, savepointSnapshotStrategy);RocksDBStateBackend将in-flight数据存储在RocksDB数据库中,它(默认)存储在TaskManager的data目录下。当checkpoint时,整个RocksDB数据库将被checkpoint到配置的文件系统和目录下。最小的元数据存储在JobManager的内存中(或者,在高可用模式下,在元数据checkpoint中)。 限制:作为RocksDB的JNI桥接API是基于byte[]的,每个key和value的最大的支持大小是 2^31字节。重要:在RocksDB中使用合并操作的状态(例如,ListState)能够默默的积累到值的size大于 2^31字节,并且在下次检索时会失败。这是目前 RocksDB JNI的限制。
object ManagedKeyedStateStreaming { private val LOG = LoggerFactory.getLogger(ManagedKeyedStateStreaming.getClass) private val KAFKA_CONSUMER_TOPIC="Test1" private val KAFKA_BROKERS="node00:9092" private val KAFKA_ZOOKEEPER_CONNECTION="node00:2181" private val KAFKA_GROUP_ID="flink-demo-group" private val KAFKA_PROP: Properties = new Properties() { setProperty("bootstrap.servers", KAFKA_BROKERS) setProperty("zookeeper.connect", KAFKA_ZOOKEEPER_CONNECTION) setProperty("group.id", KAFKA_GROUP_ID) setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer") } def main(args: Array[String]): Unit = { LOG.info("===Stateful Computation Demo===") val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.enableCheckpointing(5000)//5秒一个checkpoint // env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)//指定处理的时间特性 env.setRestartStrategy(RestartStrategies.noRestart())//重启策略 env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)//确保一次语义 // val checkPointPath = new Path("file:\\H:\\flinkTest\\checkpoint")//fs状态后端配置,如为file:///,则在taskmanager的本地 val fsStateBackend: StateBackend= new RocksDBStateBackend("file:\\H:\\flinkTest\\checkpoint") env.setStateBackend(fsStateBackend) env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)//退出不删除checkpoint val dataStream = env.addSource(new FlinkKafkaConsumer010[String](KAFKA_CONSUMER_TOPIC,new SimpleStringSchema(),KAFKA_PROP)) dataStream.filter(_.split("\\|").length==3) .map(line=>{ val arr = line.split("\\|") (arr(0),arr(2).toInt) }).keyBy(_._1) .flatMap(new SalesAmountCalculation()) .print() env.execute("Stateful Computation Demo") } } //计算汇总值 class SalesAmountCalculation extends RichFlatMapFunction[(String, Int), (String, Int)] { private var sum: ValueState[(String, Int)] = _ override def flatMap(input: (String, Int), out: Collector[(String, Int)]): Unit = { //显式调用已经过期的状态值会被删除,可以配置在读取快照时清除过期状态值,如: // val ttlConfig = StateTtlConfig // .newBuilder(Time.seconds(1)) // .cleanupFullSnapshot // .build val tmpCurrentSum = sum.value val currentSum = if (tmpCurrentSum != null) { tmpCurrentSum } else { (input._1, 0) } val newSum = (currentSum._1, currentSum._2 + input._2) sum.update(newSum) out.collect(newSum) } override def open(parameters: Configuration): Unit = { //设置状态值的过期时间 // val ttlConfig = StateTtlConfig // .newBuilder(Time.seconds(1))//过期时间1秒 // .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)//在创建和写入时更新状态值 // .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)//过期访问不返回状态值 // .build val valueStateDescriptor = new ValueStateDescriptor[(String, Int)]("sum", createTypeInformation[(String, Int)]) // valueStateDescriptor.enableTimeToLive(ttlConfig)//启用状态值过期配置 sum = getRuntimeContext.getState(valueStateDescriptor) } }上面的demo是从kafka消费数据,然后对数据每行按照|分割,最后对第一个字段进行分组对第三个字段进行求和。设置了状态后端为RocksDBStateBackend,checkpoint间隔为5s
//AbstractStreamOperator //此状态数据keyedState,所以走下面的逻辑 if (null != keyedStateBackend) { snapshotInProgress.setKeyedStateManagedFuture( keyedStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions)); } //RocksDBKeyedStateBackend public RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot( final long checkpointId, final long timestamp, @Nonnull final CheckpointStreamFactory streamFactory, @Nonnull CheckpointOptions checkpointOptions) throws Exception { long startTime = System.currentTimeMillis(); writeBatchWrapper.flush(); RocksDBSnapshotStrategyBase<K> chosenSnapshotStrategy = CheckpointType.SAVEPOINT == checkpointOptions.getCheckpointType() ? savepointSnapshotStrategy : checkpointSnapshotStrategy; RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshotRunner = chosenSnapshotStrategy.snapshot(checkpointId, timestamp, streamFactory, checkpointOptions); return snapshotRunner; } //RocksDBSnapshotStrategyBase public final RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot( long checkpointId, long timestamp, @Nonnull CheckpointStreamFactory streamFactory, @Nonnull CheckpointOptions checkpointOptions) throws Exception { if (kvStateInformation.isEmpty()) { // return DoneFuture.of(SnapshotResult.empty()); } else { return doSnapshot(checkpointId, timestamp, streamFactory, checkpointOptions); } } //RocksFullSnapshotStrategy public RunnableFuture<SnapshotResult<KeyedStateHandle>> doSnapshot( long checkpointId, long timestamp, @Nonnull CheckpointStreamFactory primaryStreamFactory, @Nonnull CheckpointOptions checkpointOptions) throws Exception { final SupplierWithException<CheckpointStreamWithResultProvider, Exception> checkpointStreamSupplier = createCheckpointStreamSupplier(checkpointId, primaryStreamFactory, checkpointOptions); final List<StateMetaInfoSnapshot> stateMetaInfoSnapshots = new ArrayList<>(kvStateInformation.size()); final List<RocksDbKvStateInfo> metaDataCopy = new ArrayList<>(kvStateInformation.size()); for (RocksDbKvStateInfo stateInfo : kvStateInformation.values()) { // snapshot meta info stateMetaInfoSnapshots.add(stateInfo.metaInfo.snapshot()); metaDataCopy.add(stateInfo); } final ResourceGuard.Lease lease = rocksDBResourceGuard.acquireResource(); final Snapshot snapshot = db.getSnapshot(); final SnapshotAsynchronousPartCallable asyncSnapshotCallable = new SnapshotAsynchronousPartCallable( checkpointStreamSupplier, lease, snapshot, stateMetaInfoSnapshots, metaDataCopy, primaryStreamFactory.toString()); return asyncSnapshotCallable.toAsyncSnapshotFutureTask(cancelStreamRegistry); } //RocksFullSnapshotStrategy.SnapshotAsynchronousPartCallable protected SnapshotResult<KeyedStateHandle> callInternal() throws Exception { final KeyGroupRangeOffsets keyGroupRangeOffsets = new KeyGroupRangeOffsets(keyGroupRange); final CheckpointStreamWithResultProvider checkpointStreamWithResultProvider = checkpointStreamSupplier.get(); snapshotCloseableRegistry.registerCloseable(checkpointStreamWithResultProvider); writeSnapshotToOutputStream(checkpointStreamWithResultProvider, keyGroupRangeOffsets); if (snapshotCloseableRegistry.unregisterCloseable(checkpointStreamWithResultProvider)) { return CheckpointStreamWithResultProvider.toKeyedStateHandleSnapshotResult( checkpointStreamWithResultProvider.closeAndFinalizeCheckpointStreamResult(), keyGroupRangeOffsets); } else { throw new IOException("Stream is already unregistered/closed."); } } private void writeSnapshotToOutputStream( @Nonnull CheckpointStreamWithResultProvider checkpointStreamWithResultProvider, @Nonnull KeyGroupRangeOffsets keyGroupRangeOffsets) throws IOException, InterruptedException { final List<Tuple2<RocksIteratorWrapper, Integer>> kvStateIterators = new ArrayList<>(metaData.size()); final DataOutputView outputView = new DataOutputViewStreamWrapper(checkpointStreamWithResultProvider.getCheckpointOutputStream()); final ReadOptions readOptions = new ReadOptions(); try { readOptions.setSnapshot(snapshot); //写入KVState的元数据,包括类型为VALUE、name为sum等信息,写入前先经过序列化。 writeKVStateMetaData(kvStateIterators, readOptions, outputView); //写入kv数据到数据库 writeKVStateData(kvStateIterators, checkpointStreamWithResultProvider, keyGroupRangeOffsets); } finally { //关闭查询 } } private void writeKVStateData( final List<Tuple2<RocksIteratorWrapper, Integer>> kvStateIterators, final CheckpointStreamWithResultProvider checkpointStreamWithResultProvider, final KeyGroupRangeOffsets keyGroupRangeOffsets) throws IOException, InterruptedException { byte[] previousKey = null; byte[] previousValue = null; DataOutputView kgOutView = null; OutputStream kgOutStream = null; CheckpointStreamFactory.CheckpointStateOutputStream checkpointOutputStream = checkpointStreamWithResultProvider.getCheckpointOutputStream(); try { // Here we transfer ownership of RocksIterators to the RocksStatesPerKeyGroupMergeIterator try (RocksStatesPerKeyGroupMergeIterator mergeIterator = new RocksStatesPerKeyGroupMergeIterator( kvStateIterators, keyGroupPrefixBytes)) { //preamble: setup with first key-group as our lookahead if (mergeIterator.isValid()) { //begin first key-group by recording the offset keyGroupRangeOffsets.setKeyGroupOffset( mergeIterator.keyGroup(), checkpointOutputStream.getPos()); //write the k/v-state id as metadata kgOutStream = keyGroupCompressionDecorator.decorateWithCompression(checkpointOutputStream); kgOutView = new DataOutputViewStreamWrapper(kgOutStream); //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible kgOutView.writeShort(mergeIterator.kvStateId()); previousKey = mergeIterator.key(); previousValue = mergeIterator.value(); mergeIterator.next(); } //main loop: write k/v pairs ordered by (key-group, kv-state), thereby tracking key-group offsets. while (mergeIterator.isValid()) { assert (!hasMetaDataFollowsFlag(previousKey)); //set signal in first key byte that meta data will follow in the stream after this k/v pair if (mergeIterator.isNewKeyGroup() || mergeIterator.isNewKeyValueState()) { //be cooperative and check for interruption from time to time in the hot loop checkInterrupted(); setMetaDataFollowsFlagInKey(previousKey); } writeKeyValuePair(previousKey, previousValue, kgOutView); //write meta data if we have to if (mergeIterator.isNewKeyGroup()) { //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible kgOutView.writeShort(END_OF_KEY_GROUP_MARK); // this will just close the outer stream kgOutStream.close(); //begin new key-group keyGroupRangeOffsets.setKeyGroupOffset( mergeIterator.keyGroup(), checkpointOutputStream.getPos()); //write the kev-state //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible kgOutStream = keyGroupCompressionDecorator.decorateWithCompression(checkpointOutputStream); kgOutView = new DataOutputViewStreamWrapper(kgOutStream); kgOutView.writeShort(mergeIterator.kvStateId()); } else if (mergeIterator.isNewKeyValueState()) { //write the k/v-state //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible kgOutView.writeShort(mergeIterator.kvStateId()); } //request next k/v pair previousKey = mergeIterator.key(); previousValue = mergeIterator.value(); mergeIterator.next(); } } //epilogue: write last key-group if (previousKey != null) { assert (!hasMetaDataFollowsFlag(previousKey)); setMetaDataFollowsFlagInKey(previousKey); writeKeyValuePair(previousKey, previousValue, kgOutView); //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible kgOutView.writeShort(END_OF_KEY_GROUP_MARK); // this will just close the outer stream kgOutStream.close(); kgOutStream = null; } } finally { // this will just close the outer stream IOUtils.closeQuietly(kgOutStream); }快照策略 提供snapshot()方法,给不同的snapshot策略去实现,这里要求snapshot结果返回的类型是RunnableFuture类型
StreamOperator的snapshotState方法返回值类型为OperatorSnapshotFutures
//StreamOperator OperatorSnapshotFutures snapshotState( long checkpointId, long timestamp, CheckpointOptions checkpointOptions, CheckpointStreamFactory storageLocation)OperatorSnapshotFutures提供了四个属性
名称描述keyedStateManagedFutureflink管理的keyedState的handlekeyedStateRawFuture用户管理的keyedState的handle(不推荐)operatorStateManagedFutureflink管理的operatorState的handleoperatorStateRawFuture用户管理的keyedState的handle(不推荐) //OperatorSnapshotFutures private RunnableFuture<SnapshotResult<KeyedStateHandle>> keyedStateManagedFuture; private RunnableFuture<SnapshotResult<KeyedStateHandle>> keyedStateRawFuture; private RunnableFuture<SnapshotResult<OperatorStateHandle>> operatorStateManagedFuture; private RunnableFuture<SnapshotResult<OperatorStateHandle>> operatorStateRawFuture;接下来看下以上四个属性是如何设置的
//AbstreacStreamOperator snapshotInProgress.setKeyedStateRawFuture(snapshotContext.getKeyedStateStreamFuture()); snapshotInProgress.setOperatorStateRawFuture(snapshotContext.getOperatorStateStreamFuture()); if (null != operatorStateBackend) { snapshotInProgress.setOperatorStateManagedFuture( operatorStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions)); } if (null != keyedStateBackend) { snapshotInProgress.setKeyedStateManagedFuture( keyedStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions)); } //DefaultOperatorStateBackend public RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshot( long checkpointId, long timestamp, @Nonnull CheckpointStreamFactory streamFactory, @Nonnull CheckpointOptions checkpointOptions) throws Exception { RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshotRunner = snapshotStrategy.snapshot(checkpointId, timestamp, streamFactory, checkpointOptions); snapshotStrategy.logSyncCompleted(streamFactory, syncStartTime); return snapshotRunner; }主要来源是从registeredOperatorStates获取。看看registeredOperatorStates是如何产生的
//OperatorStateRestoreOperation public Void restore() throws Exception { //遍历stateHandles for (OperatorStateHandle stateHandle : stateHandles) { FSDataInputStream in = stateHandle.openInputStream(); closeStreamOnCancelRegistry.registerCloseable(in); ClassLoader restoreClassLoader = Thread.currentThread().getContextClassLoader(); try { Thread.currentThread().setContextClassLoader(userClassloader); OperatorBackendSerializationProxy backendSerializationProxy = new OperatorBackendSerializationProxy(userClassloader); backendSerializationProxy.read(new DataInputViewStreamWrapper(in)); //获取restoredOperatorMetaInfoSnapshots,其实就是上一步read(new DataInputViewStreamWrapper(in))的结果 List<StateMetaInfoSnapshot> restoredOperatorMetaInfoSnapshots = backendSerializationProxy.getOperatorStateMetaInfoSnapshots(); //遍历restoredOperatorMetaInfoSnapshots for (StateMetaInfoSnapshot restoredSnapshot : restoredOperatorMetaInfoSnapshots) { //获取restoredMetaInfo final RegisteredOperatorStateBackendMetaInfo<?> restoredMetaInfo = new RegisteredOperatorStateBackendMetaInfo<>(restoredSnapshot); if (restoredMetaInfo.getPartitionStateSerializer() instanceof UnloadableDummyTypeSerializer) { } PartitionableListState<?> listState = registeredOperatorStates.get(restoredSnapshot.getName()); if (null == listState) { //如果listState 为空,用restoredMetaInfo构造一个新的listState listState = new PartitionableListState<>(restoredMetaInfo); registeredOperatorStates.put(listState.getStateMetaInfo().getName(), listState); } else { } } }创建检查的输出流,用做持久化检查点的数据 最终创建出的流为CheckPointStateOutputStream,继承了FSDataOutputStream,数据最终可以存到磁盘或HDFS。
CompositeStateHandle 恢复 DefaultOperatorStateBackendBuilder.build()—>restoreOperation.restore(); OperatorStateRestoreOperation用于恢复Operator状态 初始化registeredOperatorStates和registeredBroadcastStates
prioritizedOperatorSubtaskStates.getPrioritizedManagedOperatorState()获取stateHandles stateHandles是OperatorStateHandle的集合 jobManagerState.getManagedKeyedState() TaskStateSnapshot.subtaskStatesByOperatorID AsyncCheckpointRunnable.run中对subtaskStatesByOperatorID进行put
//OperatorSnapshotFinalizer SnapshotResult<KeyedStateHandle> keyedManaged = FutureUtils.runIfNotDoneAndGet(snapshotFutures.getKeyedStateManagedFuture()); SnapshotResult<KeyedStateHandle> keyedRaw = FutureUtils.runIfNotDoneAndGet(snapshotFutures.getKeyedStateRawFuture()); SnapshotResult<OperatorStateHandle> operatorManaged = FutureUtils.runIfNotDoneAndGet(snapshotFutures.getOperatorStateManagedFuture()); SnapshotResult<OperatorStateHandle> operatorRaw = FutureUtils.runIfNotDoneAndGet(snapshotFutures.getOperatorStateRawFuture()); jobManagerOwnedState = new OperatorSubtaskState( operatorManaged.getJobManagerOwnedSnapshot(), operatorRaw.getJobManagerOwnedSnapshot(), keyedManaged.getJobManagerOwnedSnapshot(), keyedRaw.getJobManagerOwnedSnapshot() ); taskLocalState = new OperatorSubtaskState( operatorManaged.getTaskLocalSnapshot(), operatorRaw.getTaskLocalSnapshot(), keyedManaged.getTaskLocalSnapshot(), keyedRaw.getTaskLocalSnapshot() ); );snapshotFutures是operatorSnapshotsInProgress的集合,是对operator执行snapshotState的返回
//AbstractStreamOperator public final OperatorSnapshotFutures snapshotState(long checkpointId, long timestamp, CheckpointOptions checkpointOptions, CheckpointStreamFactory factory) throws Exception { KeyGroupRange keyGroupRange = null != keyedStateBackend ? keyedStateBackend.getKeyGroupRange() : KeyGroupRange.EMPTY_KEY_GROUP_RANGE; OperatorSnapshotFutures snapshotInProgress = new OperatorSnapshotFutures(); try (StateSnapshotContextSynchronousImpl snapshotContext = new StateSnapshotContextSynchronousImpl( checkpointId, timestamp, factory, keyGroupRange, getContainingTask().getCancelables())) { snapshotState(snapshotContext); //设置KeyedStateRawFuture snapshotInProgress.setKeyedStateRawFuture(snapshotContext.getKeyedStateStreamFuture()); //设置OperatorStateRawFuture snapshotInProgress.setOperatorStateRawFuture(snapshotContext.getOperatorStateStreamFuture()); if (null != operatorStateBackend) { //设置OperatorStateManagedFuture snapshotInProgress.setOperatorStateManagedFuture( operatorStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions)); } if (null != keyedStateBackend) { //设置KeyedStateManagedFuture snapshotInProgress.setKeyedStateManagedFuture( keyedStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions)); } } catch (Exception snapshotException) { //异常 } return snapshotInProgress; }