??????????????flink-cdc,??????????kafka,????format ????canal-json????????
------------------ ???????? ------------------
??????:
"user-zh"
<[email protected]>;
????????: 2021??4??21??(??????) ????2:16
??????: "user-zh"<[email protected]>;
????: Re: flink sql ?????? mysql cdc ?? canal json ????????kafka????
Hi casel.
flink-cdc-connectors ???????? Debezium ???????????????? Canal ??????
https://github.com/ververica/flink-cdc-connectors/blob/master/README.md
casel.chen <[email protected]> ??2021??4??20?????? ????6:18??????
> ????????flink????????????canal server??????
>
>
> CREATE TABLE `binlog_table` (
>
>
`id` INT,
>
>
`name` STRING,
>
>
`sys_id` STRING,
>
>
`sequence` INT,
>
>
`filter` STRING,
>
>
`tag` STRING,
>
>
`remark` STRING,
>
>
`create_date` TIMESTAMP,
>
>
`update_date` TIMESTAMP,
>
>
`reserve` STRING,
>
>
`sys_name` STRING,
>
>
`metric_seq` INT,
>
>
`advanced_function` STRING,
>
>
`value_type` STRING,
>
>
`value_field` STRING,
>
>
`status` INT,
>
>
`syn_date` TIMESTAMP,
>
>
`confirmer` STRING,
>
>
`confirm_time` TIMESTAMP,
>
>
`index_explain` STRING,
>
>
`field_name` STRING,
>
>
`tag_values` STRING,
>
>
PRIMARY KEY (`id`) NOT ENFORCED
>
> ) WITH (
>
> 'connector' = 'mysql-cdc',
>
> 'hostname' = '${mysql.hostname}',
>
> 'port' = '3306',
>
> 'username' = '${mysql.username}',
>
> 'password' = '${mysql.password}',
>
> 'database-name' = '${mysql.database}',
>
> 'table-name' = '${mysql.table}'
>
> );
>
>
>
>
> CREATE TABLE `kafka_sink` (
>
>
`id` INT,
>
>
`name` STRING,
>
>
`sys_id` STRING,
>
>
`sequence` INT,
>
>
`filter` STRING,
>
>
`tag` STRING,
>
>
`remark` STRING,
>
>
`create_date` TIMESTAMP,
>
>
`update_date` TIMESTAMP,
>
>
`reserve` STRING,
>
>
`sys_name` STRING,
>
>
`metric_seq` INT,
>
>
`advanced_function` STRING,
>
>
`value_type` STRING,
>
>
`value_field` STRING,
>
>
`status` INT,
>
>
`syn_date` TIMESTAMP,
>
>
`confirmer` STRING,
>
>
`confirm_time` TIMESTAMP,
>
>
`index_explain` STRING,
>
>
`field_name` STRING,
>
>
`tag_values` STRING,
>
>
PRIMARY KEY (`id`) NOT ENFORCED
>
> ) WITH (
>
> 'connector' = 'kafka',
>
> 'topic' = '${topic}',
>
> 'properties.bootstrap.servers' =
'${bootstrap.servers}',
>
> 'format' = 'canal-json'
>
> );
>
>
>
>
> INSERT INTO `kafka_sink`
>
> (SELECT *
>
> FROM `binlog_table`);
>
> ????????????????:
>
>
> {
> "data": [
> {
> "id": 3,
> "name": "????????????BuyETC????",
> "sys_id": "0184",
> "sequence": 2,
> "filter": "??a=1??",
> "tag": "MerId(??????)",
> "remark": "O",
> "create_date": "2020-11-02 15:01:31",
> "update_date": "2021-04-07 09:23:59",
> "reserve": "",
> "sys_name": "NHL",
> "metric_seq": 0,
> "advanced_function": "",
> "value_type": "sum",
> "value_field": "value",
> "status": 1,
> "syn_date": "2021-01-28 19:31:36",
> "confirmer": null,
> "confirm_time": null,
> "index_explain": "aa",
> "field_name": null,
> "tag_values": null
> }
> ],
> "type": "INSERT"
> }
> ????????????canal json??????????upsert-kafka connector??????????
>
>
>
> CREATE TABLE `kafka_sink` ( `id` INT, `name` STRING, `sys_id` STRING,
> `sequence` INT, `filter` STRING, `tag` STRING, `remark` STRING,
> `create_date` TIMESTAMP, `update_date` TIMESTAMP, `reserve` STRING,
> `sys_name` STRING, `metric_seq` INT, `advanced_function` STRING,
> `value_type` STRING, `value_field` STRING, `status` INT, `syn_date`
> TIMESTAMP, `confirmer` STRING, `confirm_time` TIMESTAMP, `index_explain`
> STRING, `field_name` STRING, `tag_values` STRING, PRIMARY KEY (`id`) NOT
> ENFORCED ) WITH ( 'connector' = 'upsert-kafka', 'topic' = '${topic}',
> 'properties.bootstrap.servers' = '${bootstrap.servers}', 'key.format' =
> 'json',
> 'value.format' = 'json' );
>
>
> ????????????????
>
>
>
>
{"id":9330,"name":"????????????????????????????????00010017","sys_id":"0226","sequence":3607,"filter":null,"tag":"","remark":null,"create_date":"2021-04-06
> 12:27:30","update_date":"2021-04-06
>
12:27:30","reserve":null,"sys_name":"STAR","metric_seq":0,"advanced_function":null,"value_type":"count","value_field":"value","status":1,"syn_date":"2021-04-07
>
16:47:59","confirmer":null,"confirm_time":null,"index_explain":"????????????????????????????????00010017","field_name":null,"tag_values":null}
>
>
>