七、Flink入门--状态管理

    xiaoxiao2022-07-03  132

    Flink状态管理

    1.状态概述1.1什么是状态?1.2为什么要管理状态?1.3理想的状态管理 2.状态类型与使用示例2.1 Keyed State & Operator State2.2 Managed State & Raw State2.3 Keyed State的使用2.4 Keyed State 使用实例 3.容错与状态恢复3.1 状态的保存与恢复3.2 状态存储的方式

    1.状态概述

    1.1什么是状态?

    无状态计算指的是数据进入Flink后经过算子时只需要对当前数据进行处理就能得到想要的结果,有状态计算就是需要和历史的一些状态或进行相关操作,才能计算出正确的结果 状态的使用场景:

    去重,需要记录哪些数据出现过,哪些没出现过窗口计算,比如计算一个小时的counter机器学习,训练模型访问历史数据,需要与昨天的数据进行对比,历史数据放在状态里
    1.2为什么要管理状态?

    流式作业的特点是7*24小时运行,数据不重复消费,不丢失,保证只计算一次,数据实时产出不延迟,但是当状态很大,内存容量限制,或者实例运行奔溃,或需要扩展并发度等情况下,如何保证状态正确的管理,在任务重新执行的时候能正确执行,状态管理就显得尤为重要。

    1.3理想的状态管理

    理想的状态管理是:

    易用,flink提供了丰富的数据结构,简洁易用的接口高效,flink对状态的处理读写快,可以横向扩展,保存状态不影响计算性能可靠,flink对状态可以做持久化,而且可以保证exactly-once语义

    2.状态类型与使用示例

    Flink中有两种基本的State: Keyed State 和 Managed State

    2.1 Keyed State & Operator State
    keyed stateoperator state适用场景只能应用在KeyedSteam上可以用于所有的算子state处理方式每个key 对应一个 state,一个operator处理多个key ,会访问相应的多个state一个operator对应一个state并发改变并发改变时,state随着key在实例间迁移并发改变时需要你选择分配方式,内置:1.均匀分配 2.所有state合并后再分发给每个实例访问方式通过RuntimeContext访问,需要operator是一个richFunction需要你实现CheckPointedFunction或ListCheckPointed接口支持数据结构ValuedState,ListState,Reducing State,Aggregating State,MapState,FoldingState(1.4弃用)只支持 listState
    2.2 Managed State & Raw State
    Managed StateRaw State状态管理方式Flink Runtime 管理,自动存储,自动恢复,内存管理方式上优化明显用户自己管理,需要用户自己序列化状态数据结构已知的数据结构 value , list ,mapflink不知道你存的是什么结构,都转换为二进制字节数据[]使用场景大多数场景适用需要满足特殊业务,自定义operator时使用,flink满足不了你的需求时候,使用复杂
    2.3 Keyed State的使用
    接口状态数据类型访问接口ValueState单个值update/getMapStateMapput/putAll/remove/contains/entries/iterator/keys/valuesListStateListadd/addAll/update/getReducingState单个值add/addAll/update/getAggregatingState单个值add IN类型,get Out 类型
    2.4 Keyed State 使用实例
    public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> { /** * 第一个属性是count, 第二个属性是 sum. */ private transient ValueState<Tuple2<Long, Long>> sum; @Override public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception { //获取当前状态 Tuple2<Long, Long> currentSum = sum.value(); // count +1 currentSum.f0 += 1; // sum求和 currentSum.f1 += input.f1; // 更新状态 sum.update(currentSum); //如果count到达2, 保存 count和平均值,清除之前的状态 if (currentSum.f0 >= 2) { out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0)); sum.clear(); } } @Override public void open(Configuration config) { ValueStateDescriptor<Tuple2<Long, Long>> descriptor = new ValueStateDescriptor<>( "average", // 状态名称 TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // 类型 Tuple2.of(0L, 0L)); // 默认状态 sum = getRuntimeContext().getState(descriptor); } } // 假设我们有一个 StreamExecutionEnvironment类型的变量 env env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(1L, 4L), Tuple2.of(1L, 2L)) .keyBy(0) .flatMap(new CountWindowAverage()) .print(); // 输出结果为 (1,4) and (1,5)

    3.容错与状态恢复

    3.1 状态的保存与恢复

    Flink中采用checkpoint机制,定时对状态进行分布式快照备份,发生故障是任务可以从最后一个成功的checkpoint处重新开始处理。前提是数据源支持重读。checkpoint可以支持exactly-once和at-least-once语义。

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(5000);//checkpoint周期 env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);//EXACTLY_ONCE或AT_LEAST_ONCE env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);//最小暂停 env.getCheckpointConfig().setCheckpointTimeout(60000);//超时时间 env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);//最多一个checkpoint在运行 env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);//cancel或delete时checkpoint

    flink任务在tm或jm failoover的时候会自动从checkpoint恢复之前的状态,继续计算。当然也可以人工干预用savepoint方式对任务进行调整重启。

    3.2 状态存储的方式
    可选方式存储方式容量限制使用场景MemoryStateBackendstate存在tm内存中,checkpoint存在jm中单个state默认5M,总大小不超过jm内存,可以在构造函数中传参调大本地测试,不推荐生产FsStateBackendstate存内存,checkpoint存在外部存储如hdfs s3state总量不要超过tm内存,外存磁盘大小常规任务,分钟,join,需要开启ha的任务,可在生产使用RocksDbBackendstate存在tm上的kv数据库中单个key最大2G超大作业,天级窗口
    最新回复(0)