最近有个需求是用flink对接mysql binlog获取变更记录发到下游kafka(类似于canal 
server或debezium功能),下游kafka消息需要有 before, after, op_type, ts, database, table 
这些字段信息。我试了如下脚本发现出来的kafka消息只有data和op_type信息,其他信息都获取不到。追踪到上游debezium(flink 
cdc是基于debezium实现的)发出来的record本身就只带data和op_type信息,问一下有没有别的办法获取到变更原始记录呢?


CREATE TABLE `binlog_table` (
                                `id` INT,
                                `name` STRING,
                                `sys_id` STRING,
                                `sequence` INT,
                                `filter` STRING,
                                `tag` STRING,
                                `remark` STRING,
                                `create_date` TIMESTAMP,
                                `update_date` TIMESTAMP,
                                `reserve` STRING,
                                `sys_name` STRING,
                                `metric_seq` INT,
                                `advanced_function` STRING,
                                `value_type` STRING,
                                `value_field` STRING,
                                `status` INT,
                                `syn_date` TIMESTAMP,
                                `confirmer` STRING,
                                `confirm_time` TIMESTAMP,
                                `index_explain` STRING,
                                `field_name` STRING,
                                `tag_values` STRING,
                                PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
      'connector' = 'mysql-cdc',
      'hostname' = '${mysql.hostname}',
      'port' = '3306',
      'username' = '${mysql.username}',
      'password' = '${mysql.password}',
      'database-name' = '${mysql.database}',
      'table-name' = '${mysql.table}'
      );


CREATE TABLE `kafka_sink` (
                              `id` INT,
                              `name` STRING,
                              `sys_id` STRING,
                              `sequence` INT,
                              `filter` STRING,
                              `tag` STRING,
                              `remark` STRING,
                              `create_date` TIMESTAMP,
                              `update_date` TIMESTAMP,
                              `reserve` STRING,
                              `sys_name` STRING,
                              `metric_seq` INT,
                              `advanced_function` STRING,
                              `value_type` STRING,
                              `value_field` STRING,
                              `status` INT,
                              `syn_date` TIMESTAMP,
                              `confirmer` STRING,
                              `confirm_time` TIMESTAMP,
                              `index_explain` STRING,
                              `field_name` STRING,
                              `tag_values` STRING
) WITH (
      'connector' = 'kafka',
      'topic' = '${topic}',
      'properties.bootstrap.servers' = '${bootstrap.servers}',
      'format' = 'canal-json'
      );


INSERT INTO `kafka_sink`
    (SELECT *
     FROM `binlog_table`);





回复