????????????flink-cdc????????SourceRecord??SourceRecord??????topic??????
??????????Debezium
????????mysql-conector????????????????????kafka-connector????????????????topic??????????????????????????????????
?????? ??????????+??????+????????????????????topic??????????????????
????????????????????????SourceRecord??????topic????????????
------------------ ???????? ------------------
??????:
"user-zh"
<[email protected]>;
????????: 2021??4??22??(??????) ????2:32
??????: "user-zh"<[email protected]>;
????: Re:??????flink sql cdc????kafka????????????????????
????????????flink
cdc????debezium??????????????????????????????????????????debezium????canal????????????????????????flink
cdc??????????????????????????????????????????????flink cdc????????????????????
?? 2021-04-22 11:01:22??"????" <[email protected]> ??????
??????????????????flink??????????????????????????????????????????????????????????debezium????canal??????????????kafka,
????canal????????????after
??????????????????????????????????????????debezium????????????????flink-cdc??????????debezium??????????????????????????????record
------------------ ???????? ------------------
??????: "user-zh" <[email protected]>;
????????: 2021??4??22??(??????) ????9:41
??????: "[email protected]"<[email protected]>;
????: flink sql cdc????kafka????????????????????
????????????????flink????mysql binlog????????????????????kafka(??????canal
server??debezium????)??????kafka?????????? before, after, op_type, ts,
database, table
??????????????????????????????????????kafka????????data??op_type????????????????????????????????????debezium(flink
cdc??????debezium????????????????record??????????data??op_type????????????????????????????????????????????????
CREATE TABLE `binlog_table` (
`id` INT,
`name` STRING,
`sys_id` STRING,
`sequence` INT,
`filter` STRING,
`tag` STRING,
`remark` STRING,
`create_date` TIMESTAMP,
`update_date` TIMESTAMP,
`reserve` STRING,
`sys_name` STRING,
`metric_seq` INT,
`advanced_function` STRING,
`value_type` STRING,
`value_field` STRING,
`status` INT,
`syn_date` TIMESTAMP,
`confirmer` STRING,
`confirm_time` TIMESTAMP,
`index_explain` STRING,
`field_name` STRING,
`tag_values` STRING,
PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '${mysql.hostname}',
'port' = '3306',
'username' = '${mysql.username}',
'password' = '${mysql.password}',
'database-name' = '${mysql.database}',
'table-name' = '${mysql.table}'
);
CREATE TABLE `kafka_sink` (
`id` INT,
`name` STRING,
`sys_id` STRING,
`sequence` INT,
`filter` STRING,
`tag` STRING,
`remark` STRING,
`create_date` TIMESTAMP,
`update_date` TIMESTAMP,
`reserve` STRING,
`sys_name` STRING,
`metric_seq` INT,
`advanced_function` STRING,
`value_type` STRING,
`value_field` STRING,
`status` INT,
`syn_date` TIMESTAMP,
`confirmer` STRING,
`confirm_time` TIMESTAMP,
`index_explain` STRING,
`field_name` STRING,
`tag_values` STRING
) WITH (
'connector' = 'kafka',
'topic' = '${topic}',
'properties.bootstrap.servers' =
'${bootstrap.servers}',
'format' = 'canal-json'
);
INSERT INTO `kafka_sink`
(SELECT *
FROM `binlog_table`);