Flink代码示例(1):消费Kafka Demo

    xiaoxiao2022-07-12  147


    Flink: Blink分支 1.5.1  

    https://github.com/apache/flink/tree/blink


    Maven Dependency:

    <dependency>   <groupId>com.alibaba.blink</groupId>   <artifactId>flink-scala_2.11</artifactId>   <version>1.5.1</version> </dependency> <dependency>   <groupId>com.alibaba.blink</groupId>   <artifactId>flink-streaming-scala_2.11</artifactId>   <version>1.5.1</version> </dependency> <dependency>   <groupId>com.alibaba.blink</groupId>   <artifactId>flink-table_2.11</artifactId>   <version>1.5.1</version> </dependency> <dependency>   <groupId>com.alibaba.blink</groupId>   <artifactId>flink-table-common</artifactId>   <version>1.5.1</version> </dependency> <dependency>   <groupId>com.alibaba.blink</groupId>   <artifactId>flink-json</artifactId>   <version>1.5.1</version> </dependency> <dependency>   <groupId>com.alibaba.blink</groupId>   <artifactId>flink-connector-kafka-0.10_2.11</artifactId>   <version>1.5.1</version> </dependency>

    Kafka数据类型为Json,格式如下:

    {     "schema": {         "type": "struct",         "fields": [{             "type": "struct",             "fields": [{                 "type": "int32",                 "optional": false,                 "field": "id"             }, {                 "type": "int32",                 "optional": true,                 "field": "age"             }],             "optional": true,             "name": "postgres.public.test.Value",             "field": "before"         }, {             "type": "struct",             "fields": [{                 "type": "int32",                 "optional": false,                 "field": "id"             }, {                 "type": "int32",                 "optional": true,                 "field": "age"             }],             "optional": true,             "name": "postgres.public.test.Value",             "field": "after"         }, {             "type": "struct",             "fields": [{                 "type": "string",                 "optional": true,                 "field": "version"             }, {                 "type": "string",                 "optional": false,                 "field": "name"             }, {                 "type": "string",                 "optional": false,                 "field": "db"             }, {                 "type": "int64",                 "optional": true,                 "field": "ts_usec"             }, {                 "type": "int64",                 "optional": true,                 "field": "txId"             }, {                 "type": "int64",                 "optional": true,                 "field": "lsn"             }, {                 "type": "string",                 "optional": true,                 "field": "schema"             }, {                 "type": "string",                 "optional": true,                 "field": "table"             }, {                 "type": "boolean",                 "optional": true,                 "default": false,                 "field": "snapshot"             }, {                 "type": "boolean",                 "optional": true,                 "field": "last_snapshot_record"             }],             "optional": false,             "name": "io.debezium.connector.postgresql.Source",             "field": "source"         }, {             "type": "string",             "optional": false,             "field": "op"         }, {             "type": "int64",             "optional": true,             "field": "ts_ms"         }],         "optional": false,         "name": "postgres.public.test.Envelope"     },     "payload": {         "before": null,         "after": {             "id": 452609,             "age": 3         },         "source": {             "version": "0.8.3.Final",             "name": "postgres",             "db": "postgres",             "ts_usec": 1558601374145332000,             "txId": 21371566,             "lsn": 418263044043,             "schema": "public",             "table": "test",             "snapshot": false,             "last_snapshot_record": null         },         "op": "c",         "ts_ms": 1558601374138     } }

    示例代码:

    import org.apache.flink.api.common.typeinfo.TypeInformation

    import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo

    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

    import org.apache.flink.table.api.{TableEnvironment, Types}

    import org.apache.flink.table.descriptors.{Json, Kafka, Schema}

    import org.apache.flink.types.Row

    import org.apache.flink.streaming.api.scala._

     

    object KafkaConsumerDemo {

      def main(args: Array[String]): Unit = {

        val env = StreamExecutionEnvironment.getExecutionEnvironment

        val tblEnv = TableEnvironment.getTableEnvironment(env)

        tblEnv.connect(new Kafka()

          .version("0.10")

          .topic("postgres.public.test")

          .property("zookeeper.connect", "BigData-Dev-1:2181")

          .property("bootstrap.servers","BigData-Dev-1:9092")

          .startFromLatest()

        ).withFormat(new Json().deriveSchema())

          .withSchema(new Schema().field("schema",Types.ROW(

            Array("type","fields","optional","name"),

            Array[TypeInformation[_]](Types.STRING,ObjectArrayTypeInfo.getInfoFor(new Array[Row](0).getClass,

              Types.ROW(Array("type","fields","optional","name","field"),Array[TypeInformation[_]](

                Types.STRING,

                ObjectArrayTypeInfo.getInfoFor(new Array[Row](0).getClass,Types.ROW(Array("type","optional","field"),Array[TypeInformation[_]](Types.STRING,Types.STRING,Types.STRING)))

                ,Types.STRING,Types.STRING,Types.STRING)

              )),Types.STRING,Types.STRING)

          )).field("payload", Types.ROW(

            Array[String]("before", "after", "source", "op", "ts_ms"),

            Array[TypeInformation[_]](

              Types.ROW(Array[String]("id", "age"), Array[TypeInformation[_]](Types.STRING, Types.STRING)),

              Types.ROW(Array[String]("id", "age"), Array[TypeInformation[_]](Types.STRING, Types.STRING)),

              Types.ROW(Array[String]("version", "name", "db", "ts_usec", "txId", "lsn", "schema", "table", "snapshot", "last_snapshot_record"), Array[TypeInformation[_]](Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING)),

              Types.STRING, Types.STRING))))

          .inAppendMode()

          .registerTableSource("test")

        val tableResult = tblEnv.sqlQuery("select after.id,after.age from test where  after.id is not null")

        tblEnv.toAppendStream[Row](tableResult).print()

        env.execute()

      }

    }

    测试:

     

     

    最新回复(0)