Hi. 能同时分享下复现这个 case 的sql 以及相关的报错栈吗?

Best,
Shengkai

casel.chen <casel_c...@126.com> 于2023年2月9日周四 12:03写道:

> 不同云厂商的数据同步工具对于全量+增量同步mysql数据到kafka canal json格式时行为不一致
> 有的厂商会将DDL语句同步到topic导致下游flink sql作业不能识别抛错,建议flink canal
> json格式解析时直接忽略不识别的type,例如
> 例1:
> {"jobId":"76d140e3-2ef7-40c2-a795-af35a0fe1d61","shardId":null,"identifier":null,"eventId":"","mysqlType":null,"id":0,"es":1675908021700,"ts":1675908021700,"database":"demo","table":"oms_parcels","type":"INIT_DDL","isDdl":true,"sql":"CREATE
> TABLE `oms_parcels` (  `id` varchar(255) NOT NULL,  `createdby`
> varchar(255) DEFAULT NULL,  `createdat` timestamp NOT NULL DEFAULT
> CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,  `updatedat` timestamp NOT
> NULL DEFAULT '0000-00-00 00:00:00',  `updatedby` varchar(255) DEFAULT
> NULL,  `account` varchar(255) DEFAULT NULL,  `batch` varchar(255) DEFAULT
> NULL,  `client` varchar(255) DEFAULT NULL,  `command` varchar(255) DEFAULT
> NULL,  `container` varchar(255) DEFAULT NULL,  `items` mediumtext,
> `trackingnumber` varchar(255) NOT NULL,  `transporter` varchar(255) DEFAULT
> NULL,  `weight` decimal(19,2) NOT NULL,  `zipcode` varchar(255) DEFAULT
> NULL,  `ld3` varchar(255) DEFAULT NULL,  `destination_code` varchar(255)
> DEFAULT NULL,  PRIMARY KEY (`id`,`trackingnumber`)) ENGINE=InnoDB DEFAULT
> CHARSET=utf8mb4","sqlType":null,"data":null,"old":null,"pkNames":null}
>
>
> 例2:
> {
>     "action":"ALTER",
>     "before":[],
>     "bid":0,
>     "data":[],
>     "db":"db_test",
>     "dbValType":{
>         "col1":"varchar(22)",
>         "col2":"varchar(22)",
>         "col_pk":"varchar(22)"
>     },
>     "ddl":true,
>     "entryType":"ROWDATA",
>     "execTs":1669789188000,
>     "jdbcType":{
>         "col1":12,
>         "col2":12,
>         "col_pk":12
>     },
>     "pks":[],
>     "schema":"db_test",
>     "sendTs":1669789189533,
>     "sql":"alter table table_test add col2 varchar(22) null",
>     "table":"table_test",
>     "tableChanges":{
>         "table":{
>             "columns":[
>                 {
>                     "jdbcType":12, // jdbc 类型。
>                     "name":"col1",    // 字段名称。
>                     "position":0,  // 字段的顺序。
>                     "typeExpression":"varchar(22)", // 类型描述。
>                     "typeName":"varchar" // 类型名称。
>                 },
>                 {
>                     "jdbcType":12,
>                     "name":"col2",
>                     "position":1,
>                     "typeExpression":"varchar(22)",
>                     "typeName":"varchar"
>                 },
>                 {
>                     "jdbcType":12,
>                     "name":"col_pk",
>                     "position":2,
>                     "typeExpression":"varchar(22)",
>                     "typeName":"varchar"
>                 }
>             ],
>             "primaryKeyColumnNames":["col_pk"] // 主键名列表。
>         },
>         "type":"ALTER"
>     }
> }

回复