我的疑问正是flink cdc集成debezium后为何会把原始信息弄丢失了?直接采用原生的debezium或者canal同步数据固然可以。但如果flink
cdc直接能发出来的话不就可以节省这些组件和运维么?flink cdc设计的初衷也是如此。
在 2021-04-22 11:01:22,"飞翔" <[email protected]> 写道:
既然这样,为何要用flink去同步信息,把信息的原始信息都丢失了。你可以直接采用原生的debezium或者canal同步数据,发送kafka,
比如canal的样例,虽然after
不是很全,你可以自己去构造补全,这样你采用debezium不就好了,也就是flink-cdc为什么集成debezium的原因,更新前后都是一个完整的record
------------------ 原始邮件 ------------------
发件人: "user-zh" <[email protected]>;
发送时间: 2021年4月22日(星期四) 上午9:41
收件人: "[email protected]"<[email protected]>;
主题: flink sql cdc发到kafka消息表名信息缺失问题
最近有个需求是用flink对接mysql binlog获取变更记录发到下游kafka(类似于canal
server或debezium功能),下游kafka消息需要有 before, after, op_type, ts, database, table
这些字段信息。我试了如下脚本发现出来的kafka消息只有data和op_type信息,其他信息都获取不到。追踪到上游debezium(flink
cdc是基于debezium实现的)发出来的record本身就只带data和op_type信息,问一下有没有别的办法获取到变更原始记录呢?
CREATE TABLE `binlog_table` (
`id` INT,
`name` STRING,
`sys_id` STRING,
`sequence` INT,
`filter` STRING,
`tag` STRING,
`remark` STRING,
`create_date` TIMESTAMP,
`update_date` TIMESTAMP,
`reserve` STRING,
`sys_name` STRING,
`metric_seq` INT,
`advanced_function` STRING,
`value_type` STRING,
`value_field` STRING,
`status` INT,
`syn_date` TIMESTAMP,
`confirmer` STRING,
`confirm_time` TIMESTAMP,
`index_explain` STRING,
`field_name` STRING,
`tag_values` STRING,
PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '${mysql.hostname}',
'port' = '3306',
'username' = '${mysql.username}',
'password' = '${mysql.password}',
'database-name' = '${mysql.database}',
'table-name' = '${mysql.table}'
);
CREATE TABLE `kafka_sink` (
`id` INT,
`name` STRING,
`sys_id` STRING,
`sequence` INT,
`filter` STRING,
`tag` STRING,
`remark` STRING,
`create_date` TIMESTAMP,
`update_date` TIMESTAMP,
`reserve` STRING,
`sys_name` STRING,
`metric_seq` INT,
`advanced_function` STRING,
`value_type` STRING,
`value_field` STRING,
`status` INT,
`syn_date` TIMESTAMP,
`confirmer` STRING,
`confirm_time` TIMESTAMP,
`index_explain` STRING,
`field_name` STRING,
`tag_values` STRING
) WITH (
'connector' = 'kafka',
'topic' = '${topic}',
'properties.bootstrap.servers' = '${bootstrap.servers}',
'format' = 'canal-json'
);
INSERT INTO `kafka_sink`
(SELECT *
FROM `binlog_table`);