Kylin消费Kafka数据流式构建cube

    xiaoxiao2022-07-05  142

    文章开始之前先说明环境情况,这里kylin消费的kafka数据是从Oracle 数据库用Ogg For Bigdata以json格式将数据投递到kafka topic的,投递的时候,关于insert和update 之前的数据投递到名为 ZTVOUCHER_INS 的topic,而delete和update之后的数据投递到名为 ZTVOUCHER_DEL 的topic中,前面说的这种投递方式本人在文章OGG For Bigdata 12按操作类型同步Oracle数据到kafka不同topic中已经详细说明了,这里主要介绍kylin如何消费数据创建流式cube。

    一、源端做DML操作

    1.源端表ztvoucher目前没有数据,现在做insert,并查询:

    insert into ztvoucher (MANDT, GJAHR, BUKRS, BELNR, BUZEI, MONAT, BUDAT, HKONT, DMBTR, ZZ0014) values ('666', '2222', '3432', '2200001414', '001', '01', '20190101', '9101000000', 100.00, '101'); 1 row created. SQL> commit; Commit complete. SQL> alter system switch logfile; System altered. SQL> select * from ztvoucher; MANDT GJAHR BUKRS BELNR BUZEI MONAT BUDAT HKONT DMBTR ZZ0014 666 2222 3432 2200001414 001 01 20190101 9101000000 100.00 101

    2.去kafka查看:

    [root@hadoop kafka]# ./console.sh input topic:ZTVOUCHER_INS Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper] .{"table":"SCOTT.ZTVOUCHER_INS","op_type":"I","op_ts":"2019-05-22 16:18:58.353767","current_ts":"2019-05-22T16:19:11.352000","pos":"00000000080000012086","tokens":{"TKN-OP-TYPE":"INSERT"},"a fter":{"MANDT":"666","GJAHR":"2222","BUKRS":"3432","BELNR":"2200001414","BUZEI":"001","MONAT":"01","BUDAT":"20190101","HKONT":"9101000000","DMBTR":100.00,"ZZ0014":"101"}}

    发现源端做的insert已经在 topic:ZTVOUCHER_INS有了。 3.源端做update操作:

    update ztvoucher set dmbtr=50 where mandt='666'; commit; alter system switch logfile;

    4.去kafka查看: 先看ZTOVOUCHER_INS 内容:

    [root@hadoop kafka]# ./console.sh input topic:ZTVOUCHER_INS Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper] .{"table":"SCOTT.ZTVOUCHER_INS","op_type":"I","op_ts":"2019-05-22 16:18:58.353767","current_ts":"2019-05-22T16:19:11.352000","pos":"00000000080000012086","tokens":{"TKN-OP-TYPE":"INSERT"},"a fter":{"MANDT":"666","GJAHR":"2222","BUKRS":"3432","BELNR":"2200001414","BUZEI":"001","MONAT":"01","BUDAT":"20190101","HKONT":"9101000000","DMBTR":100.00,"ZZ0014":"101"}}{"table":"SCOTT.ZTVOUCHER_INS","op_type":"I","op_ts":"2019-05-22 16:22:48.354189","current_ts":"2019-05-22T16:23:33.799000","pos":"00000000080000012613","tokens":{"TKN-OP-TYPE":"SQL COMPUPD ATE"},"after":{"MANDT":"666","GJAHR":"2222","BUKRS":"3432","BELNR":"2200001414","BUZEI":"001","MONAT":"01","BUDAT":"20190101","HKONT":"9101000000","DMBTR":50.00,"ZZ0014":"101"}}

    发现除了之前的insert操作,现在update之后的数据也进来了。 再看ZTVOUCHER_DEL:

    [root@hadoop kafka]# ./console.sh input topic:ZTVOUCHER_DEL Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper] .{"table":"SCOTT.ZTVOUCHER_DEL","op_type":"I","op_ts":"2019-05-22 16:22:48.354189","current_ts":"2019-05-22T16:23:23.781000","pos":"00000000080000012345","tokens":{"TKN-OP-TYPE":"SQL COMPUPD ATE"},"after":{"MANDT":"666","GJAHR":"2222","BUKRS":"3432","BELNR":"2200001414","BUZEI":"001","MONAT":"01","BUDAT":"20190101","HKONT":"9101000000","DMBTR":100.00,"ZZ0014":"101"}}

    发现DEL的topic中也存入了update之前的数据。 5.源端做delete操作:

    delete from ztvoucher where mandt='666'; commit; alter system switch logfile;

    6.去kafka查看:

    [root@hadoop kafka]# ./console.sh input topic:ZTVOUCHER_DEL Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper] .{"table":"SCOTT.ZTVOUCHER_DEL","op_type":"I","op_ts":"2019-05-22 16:22:48.354189","current_ts":"2019-05-22T16:23:23.781000","pos":"00000000080000012345","tokens":{"TKN-OP-TYPE":"SQL COMPUPD ATE"},"after":{"MANDT":"666","GJAHR":"2222","BUKRS":"3432","BELNR":"2200001414","BUZEI":"001","MONAT":"01","BUDAT":"20190101","HKONT":"9101000000","DMBTR":100.00,"ZZ0014":"101"}}{"table":"SCOTT.ZTVOUCHER_DEL","op_type":"I","op_ts":"2019-05-22 16:26:26.353705","current_ts":"2019-05-22T16:27:15.049000","pos":"00000000080000012857","tokens":{"TKN-OP-TYPE":"DELETE"},"a fter":{"MANDT":"666","GJAHR":"2222","BUKRS":"3432","BELNR":"2200001414","BUZEI":"001","MONAT":"01","BUDAT":"20190101","HKONT":"9101000000","DMBTR":50.00,"ZZ0014":"101"}}

    发现除了上面update之前的数据以外,还写入了刚做的delete操作的数据。 好了,现在数据都组织好了,现在去流式创建cube。

    二、流式构建cube

    流式构建cube官方连接(本人用的2.4版本): http://kylin.apache.org/cn/docs24/tutorial/cube_streaming.html 流式构建cube需要一个类型为timestamp的时间列字段用来标识消息的时间,从前面两个topic中的json数据可以看到,op_ts字段满足这个要求。 1、用j’son数据定义一张表 先来构建

    最新回复(0)