Hi. 能同时分享下复现这个 case 的sql 以及相关的报错栈吗?
Best, Shengkai casel.chen <casel_c...@126.com> 于2023年2月9日周四 12:03写道: > 不同云厂商的数据同步工具对于全量+增量同步mysql数据到kafka canal json格式时行为不一致 > 有的厂商会将DDL语句同步到topic导致下游flink sql作业不能识别抛错,建议flink canal > json格式解析时直接忽略不识别的type,例如 > 例1: > {"jobId":"76d140e3-2ef7-40c2-a795-af35a0fe1d61","shardId":null,"identifier":null,"eventId":"","mysqlType":null,"id":0,"es":1675908021700,"ts":1675908021700,"database":"demo","table":"oms_parcels","type":"INIT_DDL","isDdl":true,"sql":"CREATE > TABLE `oms_parcels` ( `id` varchar(255) NOT NULL, `createdby` > varchar(255) DEFAULT NULL, `createdat` timestamp NOT NULL DEFAULT > CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, `updatedat` timestamp NOT > NULL DEFAULT '0000-00-00 00:00:00', `updatedby` varchar(255) DEFAULT > NULL, `account` varchar(255) DEFAULT NULL, `batch` varchar(255) DEFAULT > NULL, `client` varchar(255) DEFAULT NULL, `command` varchar(255) DEFAULT > NULL, `container` varchar(255) DEFAULT NULL, `items` mediumtext, > `trackingnumber` varchar(255) NOT NULL, `transporter` varchar(255) DEFAULT > NULL, `weight` decimal(19,2) NOT NULL, `zipcode` varchar(255) DEFAULT > NULL, `ld3` varchar(255) DEFAULT NULL, `destination_code` varchar(255) > DEFAULT NULL, PRIMARY KEY (`id`,`trackingnumber`)) ENGINE=InnoDB DEFAULT > CHARSET=utf8mb4","sqlType":null,"data":null,"old":null,"pkNames":null} > > > 例2: > { > "action":"ALTER", > "before":[], > "bid":0, > "data":[], > "db":"db_test", > "dbValType":{ > "col1":"varchar(22)", > "col2":"varchar(22)", > "col_pk":"varchar(22)" > }, > "ddl":true, > "entryType":"ROWDATA", > "execTs":1669789188000, > "jdbcType":{ > "col1":12, > "col2":12, > "col_pk":12 > }, > "pks":[], > "schema":"db_test", > "sendTs":1669789189533, > "sql":"alter table table_test add col2 varchar(22) null", > "table":"table_test", > "tableChanges":{ > "table":{ > "columns":[ > { > "jdbcType":12, // jdbc 类型。 > "name":"col1", // 字段名称。 > "position":0, // 字段的顺序。 > "typeExpression":"varchar(22)", // 类型描述。 > "typeName":"varchar" // 类型名称。 > }, > { > "jdbcType":12, > "name":"col2", > "position":1, > "typeExpression":"varchar(22)", > "typeName":"varchar" > }, > { > "jdbcType":12, > "name":"col_pk", > "position":2, > "typeExpression":"varchar(22)", > "typeName":"varchar" > } > ], > "primaryKeyColumnNames":["col_pk"] // 主键名列表。 > }, > "type":"ALTER" > } > }