Flink在流处理程序中支持不同的时间概念。 首先需要理解三种time的含义:
Processing time: Processing time refers to the system time of the machine that is executing the respective operation. 处理事件时的时间,处理简单,结果不确定Event time: Event time is the time that each individual event occurred on its producing device. 事件产生的时间,处理复杂,结果确定可以重现Ingestion time: Ingestion time is the time that events enter Flink. 事件进入Flink的时间首先定义你的流处理程序是已哪一种时间为标志的。
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); // env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); // env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);在上述例子中为了使用eventTime作为时间处理标志,必须在数据源中定义eventTIme并自动发出watermark或者程序在数据源定义之后注入一个TimeStamp Assigner 和 Watermark Generator 其中的一个。 在流处理程序中需要有一种方法能衡量数据处理的进度,例如统计每小时的窗口操作,需要在一小时后通知operator,并将窗口关闭。在flink中衡量event time进度的机制是watermark, watermark作为数据流的一部分流动,并带有时间戳,watermark t表示不会再有比t小的时间的数据到达,下图展示了一个有序的流watermark watermark对于无序流至关重要,因为eventTime不按照时间戳排序,所以watermark声明当前时间戳的时间都已到达,opeator会将其内部的时间提前到watermark的值。
首先我们来看下TimeStamp的分配和WaterMark的生成,总共有2中方法: 1.在SourceFunction中产生,相当于将流的源头定义
collectWithTimeStamp(T element, long timestamp) 参数:发送的数据,timestampemitWaterMark(WaterMark mark)2.Time Assigner 在DataStream Api 流程中调用 DataStream.assignTimeStampsAndWaterMarks(),有2中具体的实现方法如下:
定期生成根据特殊记录生成现实时间驱动数据驱动每隔一段时间调用生成每一次分配TimeStamp都会调用方法实现AssignerWithPeriodicWatermarks实现AssignerWithPunctuatedWaterMarkswatermark的传播有以下几个特点:
watermark以广播的形式在算子之间传播收到Long.MAX_VALUE表示不会再有数据收到,终止的意思单输入流取最大的Watermark,多输入的流取最小的watermarkwatermark单个输入取最大的,对于整个任务取最小的:
对于同一个流,例如从kafka消费数据,多个partition,flink会进行强制的同步时钟;但是对于不同流之间,join union操作,这时候快的流要等慢的流,这时候快的流就要在状态中缓存很大的数据去等待慢的流,这是一个很大的资源消耗。在讲watermark的处理之前,先讲一下ProcessFunction,因为watermark处理的外部逻辑是通过ProcessFunction来实现。ProcessFunction能实现时间相关的功能主要有三点:
获取记录的TimeStamp 或 ProcTime获取算子时间(或者理解为获取watermark时间)注册Timer并提供回调逻辑 registerEventTimeTimer()registerProcessingTimeTimer()onTime() 处理逻辑 所有的时间逻辑都是由TimeService来完成的接下来看下当一个算子收到watermark时都要干些什么事情呢? 1.更新算子时间 2.遍历计时器队列触发回调 3.将watermark发送到下游
这样上述2.1-2.3就完成了watermark的生成到传播到计算再到传播,形成一个完整的闭环。
首先看一下ProcessingTime的在Table api和sql中的使用 ProcessingTime:
从DataStream转化通过TableSource生成tEnv.fromDataStream(stream,“f1,f2,f3.proctime”)TableSource实现DefinedProcTimeAttributes接口注意新加的processTime必须是最后一个字段
EventTime:
从DataStream转化通过TableSource生成原始DataSteam必须有TimeStamp和watermark数据中存在long或timestamp类型的字段tEnv.fromDataStream(stream,“f1,f2,f3.rowtime”)TableSource实现DefinedRowTimeAttributes接口