Flink入门(十一) state状态update更新

    xiaoxiao2022-07-13  145

    有这样需求,统计每个小时,一个线路被点击次数, 需要每30秒统计一次写入redis,采用Slide窗口滑动,但发现一个问题,当key没有新的消息消费, 当(key,value)的value不变的,滑动统计还是会输出统计值,这个key的value就没必须要写入redis。 所以采用ValueState的update更新,

    代码如下:

    DataStream<Tuple2<String, Integer>> result = exposure.map(new MapFunction<PlanDetailBO, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> map(PlanDetailBO value) throws Exception { String durationData = value.getFirstFromDate(); String firstTrafficCode = value.getFirstNo(); String secondTrafficCode = value.getSecondNo(); String startStationCode = value.getFirstFromStationCode(); String transferArriveStationCode = value.getFirstToStationCode(); String transferLeaveStationCode = value.getSecondFromStationCode(); String endStationCode = value.getSecondToStationCode(); String key = String.format("%s+%s+%s+%s+%s+%s+%s", durationData, firstTrafficCode, secondTrafficCode, startStationCode, transferArriveStationCode, transferLeaveStationCode, endStationCode); return Tuple2.of(key, 1); } }).keyBy(s -> s.f0).window(SlidingEventTimeWindows.of(Time.seconds(60*60), Time.seconds(30))).sum(1);

    统计结果,过去一个小时,每30秒触发一次,无论有没有新的key下新消息,都将30秒统计输出。 所以采用定义ValueState update更新

    DataStream<Tuple2<String, Integer>> updateResult = result.keyBy(0).map(new RichMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() { //记录统计值 private transient ValueState<Integer> counts; @Override public void open(Configuration parameters) throws Exception { //设置ValueState的TTL的生命周期为1个小时,自动会清除ValueState的里内容 StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(org.apache.flink.api.common.time.Time.minutes(60)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired).build(); //设置ValueState的默认值 ValueStateDescriptor<Integer> descriptor = new ValueStateDescriptor<Integer>("plan_num", Integer.class); descriptor.enableTimeToLive(ttlConfig); counts = getRuntimeContext().getState(descriptor); super.open(parameters); } @Override public Tuple2<String, Integer> map(Tuple2<String, Integer> value) throws Exception { Integer num = value.f1; String key = value.f0+"status"; //如果统计值没有发生变化就输出null if (num == counts.value()) { return Tuple2.of(key, null); } counts.update(num); return Tuple2.of(key, num); } }).filter(s->s.f1!=null); //过滤null只输出变化的数值 updateResult.print(); //写入redis,略

    这样就可以完成我们所需要的update更新。

    最新回复(0)