Flik代码示例(2):JDBC写数据到Phoenix

    xiaoxiao2022-07-13  173


    Flink: Blink分支 1.5.1  

    https://github.com/apache/flink/tree/blink


    Maven Dependency:

    <dependency>   <groupId>com.alibaba.blink</groupId>   <artifactId>flink-jdbc</artifactId>   <version>1.5.1</version> </dependency>

    代码示例:

    import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat

    val tableResult = tblEnv.sqlQuery("select after.id,after.age from test where  after.id is not null")

    val result = tblEnv.toAppendStream[Row](tableResult)

    result.writeUsingOutputFormat(JDBCOutputFormat.buildJDBCOutputFormat()

      .setDrivername("org.apache.phoenix.jdbc.PhoenixDriver")

      .setDBUrl("jdbc:phoenix:40.125.209.102:2181:/hbase;autocommit=true")

      .setQuery("upsert INTO TEST_4  values (?,?)")

      .setBatchInterval(1)

      .finish())

    注:如果发现代码正常运行不报错,但数据没有写入,注意setBatchInterval配置,设为1即为一条数据一提交,100即为一百条数据提交一次,不够100条不提交

    最新回复(0)