Flink: Blink分支 1.5.1
https://github.com/apache/flink/tree/blink
Maven Dependency:
<dependency> <groupId>com.alibaba.blink</groupId> <artifactId>flink-scala_2.11</artifactId> <version>1.5.1</version> </dependency> <dependency> <groupId>com.alibaba.blink</groupId> <artifactId>flink-streaming-scala_2.11</artifactId> <version>1.5.1</version> </dependency> <dependency> <groupId>com.alibaba.blink</groupId> <artifactId>flink-table_2.11</artifactId> <version>1.5.1</version> </dependency> <dependency> <groupId>com.alibaba.blink</groupId> <artifactId>flink-table-common</artifactId> <version>1.5.1</version> </dependency> <dependency> <groupId>com.alibaba.blink</groupId> <artifactId>flink-json</artifactId> <version>1.5.1</version> </dependency> <dependency> <groupId>com.alibaba.blink</groupId> <artifactId>flink-connector-kafka-0.10_2.11</artifactId> <version>1.5.1</version> </dependency>
Kafka数据类型为Json,格式如下:
{ "schema": { "type": "struct", "fields": [{ "type": "struct", "fields": [{ "type": "int32", "optional": false, "field": "id" }, { "type": "int32", "optional": true, "field": "age" }], "optional": true, "name": "postgres.public.test.Value", "field": "before" }, { "type": "struct", "fields": [{ "type": "int32", "optional": false, "field": "id" }, { "type": "int32", "optional": true, "field": "age" }], "optional": true, "name": "postgres.public.test.Value", "field": "after" }, { "type": "struct", "fields": [{ "type": "string", "optional": true, "field": "version" }, { "type": "string", "optional": false, "field": "name" }, { "type": "string", "optional": false, "field": "db" }, { "type": "int64", "optional": true, "field": "ts_usec" }, { "type": "int64", "optional": true, "field": "txId" }, { "type": "int64", "optional": true, "field": "lsn" }, { "type": "string", "optional": true, "field": "schema" }, { "type": "string", "optional": true, "field": "table" }, { "type": "boolean", "optional": true, "default": false, "field": "snapshot" }, { "type": "boolean", "optional": true, "field": "last_snapshot_record" }], "optional": false, "name": "io.debezium.connector.postgresql.Source", "field": "source" }, { "type": "string", "optional": false, "field": "op" }, { "type": "int64", "optional": true, "field": "ts_ms" }], "optional": false, "name": "postgres.public.test.Envelope" }, "payload": { "before": null, "after": { "id": 452609, "age": 3 }, "source": { "version": "0.8.3.Final", "name": "postgres", "db": "postgres", "ts_usec": 1558601374145332000, "txId": 21371566, "lsn": 418263044043, "schema": "public", "table": "test", "snapshot": false, "last_snapshot_record": null }, "op": "c", "ts_ms": 1558601374138 } }
示例代码:
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.{TableEnvironment, Types}
import org.apache.flink.table.descriptors.{Json, Kafka, Schema}
import org.apache.flink.types.Row
import org.apache.flink.streaming.api.scala._
object KafkaConsumerDemo {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tblEnv = TableEnvironment.getTableEnvironment(env)
tblEnv.connect(new Kafka()
.version("0.10")
.topic("postgres.public.test")
.property("zookeeper.connect", "BigData-Dev-1:2181")
.property("bootstrap.servers","BigData-Dev-1:9092")
.startFromLatest()
).withFormat(new Json().deriveSchema())
.withSchema(new Schema().field("schema",Types.ROW(
Array("type","fields","optional","name"),
Array[TypeInformation[_]](Types.STRING,ObjectArrayTypeInfo.getInfoFor(new Array[Row](0).getClass,
Types.ROW(Array("type","fields","optional","name","field"),Array[TypeInformation[_]](
Types.STRING,
ObjectArrayTypeInfo.getInfoFor(new Array[Row](0).getClass,Types.ROW(Array("type","optional","field"),Array[TypeInformation[_]](Types.STRING,Types.STRING,Types.STRING)))
,Types.STRING,Types.STRING,Types.STRING)
)),Types.STRING,Types.STRING)
)).field("payload", Types.ROW(
Array[String]("before", "after", "source", "op", "ts_ms"),
Array[TypeInformation[_]](
Types.ROW(Array[String]("id", "age"), Array[TypeInformation[_]](Types.STRING, Types.STRING)),
Types.ROW(Array[String]("id", "age"), Array[TypeInformation[_]](Types.STRING, Types.STRING)),
Types.ROW(Array[String]("version", "name", "db", "ts_usec", "txId", "lsn", "schema", "table", "snapshot", "last_snapshot_record"), Array[TypeInformation[_]](Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING)),
Types.STRING, Types.STRING))))
.inAppendMode()
.registerTableSource("test")
val tableResult = tblEnv.sqlQuery("select after.id,after.age from test where after.id is not null")
tblEnv.toAppendStream[Row](tableResult).print()
env.execute()
}
}
测试: