我看了源码,即使改换成debezium json格式输出,也得不到原本debezium
json数据,因为输出字段只有有限的3个,没有关键的库表信息。而且看了其他几个cdc格式,都有类似的问题
想知道是为什么?追踪到上游debezium
emitRecords方法,参数record就只有rowdata和rowkind信息,没有table和database
DebeziumJsonSerializationSchema.java
private static RowType createJsonRowType(DataType databaseSchema) {
// Debezium JSON contains some other information, e.g. "source", "ts_ms"
// but we don't need them.
return (RowType)
DataTypes.ROW(
DataTypes.FIELD("before", databaseSchema),
DataTypes.FIELD("after", databaseSchema),
DataTypes.FIELD("op", DataTypes.STRING()))
.getLogicalType();
}
在 2021-04-21 14:18:42,"飞翔" <[email protected]> 写道:
>真的话,就不用flink-cdc,你直接使用kafka,然后format 采用canal-json就好了。
>
>
>------------------ 原始邮件 ------------------
>发件人:
> "user-zh"
>
><[email protected]>;
>发送时间: 2021年4月21日(星期三) 下午2:16
>收件人: "user-zh"<[email protected]>;
>
>主题: Re: flink sql 不支持 mysql cdc 以 canal json 格式发到kafka吗?
>
>
>
>Hi casel.
>flink-cdc-connectors 是集成的 Debezium 引擎,应该不支持 Canal 格式。
>
>https://github.com/ververica/flink-cdc-connectors/blob/master/README.md
>
>casel.chen <[email protected]> 于2021年4月20日周二 下午6:18写道:
>
>> 目标是用flink作业实现类似canal server的功能
>>
>>
>> CREATE TABLE `binlog_table` (
>>
>>
> `id` INT,
>>
>>
> `name` STRING,
>>
>>
> `sys_id` STRING,
>>
>>
> `sequence` INT,
>>
>>
> `filter` STRING,
>>
>>
> `tag` STRING,
>>
>>
> `remark` STRING,
>>
>>
> `create_date` TIMESTAMP,
>>
>>
> `update_date` TIMESTAMP,
>>
>>
> `reserve` STRING,
>>
>>
> `sys_name` STRING,
>>
>>
> `metric_seq` INT,
>>
>>
> `advanced_function` STRING,
>>
>>
> `value_type` STRING,
>>
>>
> `value_field` STRING,
>>
>>
> `status` INT,
>>
>>
> `syn_date` TIMESTAMP,
>>
>>
> `confirmer` STRING,
>>
>>
> `confirm_time` TIMESTAMP,
>>
>>
> `index_explain` STRING,
>>
>>
> `field_name` STRING,
>>
>>
> `tag_values` STRING,
>>
>>
> PRIMARY KEY (`id`) NOT ENFORCED
>>
>> ) WITH (
>>
>> 'connector' = 'mysql-cdc',
>>
>> 'hostname' = '${mysql.hostname}',
>>
>> 'port' = '3306',
>>
>> 'username' = '${mysql.username}',
>>
>> 'password' = '${mysql.password}',
>>
>> 'database-name' = '${mysql.database}',
>>
>> 'table-name' = '${mysql.table}'
>>
>> );
>>
>>
>>
>>
>> CREATE TABLE `kafka_sink` (
>>
>>
> `id` INT,
>>
>>
> `name` STRING,
>>
>>
> `sys_id` STRING,
>>
>>
> `sequence` INT,
>>
>>
> `filter` STRING,
>>
>>
> `tag` STRING,
>>
>>
> `remark` STRING,
>>
>>
> `create_date` TIMESTAMP,
>>
>>
> `update_date` TIMESTAMP,
>>
>>
> `reserve` STRING,
>>
>>
> `sys_name` STRING,
>>
>>
> `metric_seq` INT,
>>
>>
> `advanced_function` STRING,
>>
>>
> `value_type` STRING,
>>
>>
> `value_field` STRING,
>>
>>
> `status` INT,
>>
>>
> `syn_date` TIMESTAMP,
>>
>>
> `confirmer` STRING,
>>
>>
> `confirm_time` TIMESTAMP,
>>
>>
> `index_explain` STRING,
>>
>>
> `field_name` STRING,
>>
>>
> `tag_values` STRING,
>>
>>
> PRIMARY KEY (`id`) NOT ENFORCED
>>
>> ) WITH (
>>
>> 'connector' = 'kafka',
>>
>> 'topic' = '${topic}',
>>
>> 'properties.bootstrap.servers' =
>'${bootstrap.servers}',
>>
>> 'format' = 'canal-json'
>>
>> );
>>
>>
>>
>>
>> INSERT INTO `kafka_sink`
>>
>> (SELECT *
>>
>> FROM `binlog_table`);
>>
>> 出来的结果是这样:
>>
>>
>> {
>> "data": [
>> {
>> "id": 3,
>> "name": "自动付款接口BuyETC金额",
>> "sys_id": "0184",
>> "sequence": 2,
>> "filter": "(a=1)",
>> "tag": "MerId(商户号)",
>> "remark": "O",
>> "create_date": "2020-11-02 15:01:31",
>> "update_date": "2021-04-07 09:23:59",
>> "reserve": "",
>> "sys_name": "NHL",
>> "metric_seq": 0,
>> "advanced_function": "",
>> "value_type": "sum",
>> "value_field": "value",
>> "status": 1,
>> "syn_date": "2021-01-28 19:31:36",
>> "confirmer": null,
>> "confirm_time": null,
>> "index_explain": "aa",
>> "field_name": null,
>> "tag_values": null
>> }
>> ],
>> "type": "INSERT"
>> }
>> 并不是标准的canal json格式。改用upsert-kafka connector试了也不行
>>
>>
>>
>> CREATE TABLE `kafka_sink` ( `id` INT, `name` STRING, `sys_id` STRING,
>> `sequence` INT, `filter` STRING, `tag` STRING, `remark` STRING,
>> `create_date` TIMESTAMP, `update_date` TIMESTAMP, `reserve` STRING,
>> `sys_name` STRING, `metric_seq` INT, `advanced_function` STRING,
>> `value_type` STRING, `value_field` STRING, `status` INT, `syn_date`
>> TIMESTAMP, `confirmer` STRING, `confirm_time` TIMESTAMP, `index_explain`
>> STRING, `field_name` STRING, `tag_values` STRING, PRIMARY KEY (`id`) NOT
>> ENFORCED ) WITH ( 'connector' = 'upsert-kafka', 'topic' = '${topic}',
>> 'properties.bootstrap.servers' = '${bootstrap.servers}', 'key.format' =
>> 'json',
>> 'value.format' = 'json' );
>>
>>
>> 出来的数据长这样
>>
>>
>>
>>
>{"id":9330,"name":"发展商户商户进件页面点击提交按钮00010017","sys_id":"0226","sequence":3607,"filter":null,"tag":"","remark":null,"create_date":"2021-04-06
>> 12:27:30","update_date":"2021-04-06
>>
>12:27:30","reserve":null,"sys_name":"STAR","metric_seq":0,"advanced_function":null,"value_type":"count","value_field":"value","status":1,"syn_date":"2021-04-07
>>
>16:47:59","confirmer":null,"confirm_time":null,"index_explain":"发展商户商户进件页面点击提交按钮00010017","field_name":null,"tag_values":null}
>>
>>
>>