不同云厂商的数据同步工具对于全量+增量同步mysql数据到kafka canal json格式时行为不一致 有的厂商会将DDL语句同步到topic导致下游flink sql作业不能识别抛错,建议flink canal json格式解析时直接忽略不识别的type,例如 例1: {"jobId":"76d140e3-2ef7-40c2-a795-af35a0fe1d61","shardId":null,"identifier":null,"eventId":"","mysqlType":null,"id":0,"es":1675908021700,"ts":1675908021700,"database":"demo","table":"oms_parcels","type":"INIT_DDL","isDdl":true,"sql":"CREATE TABLE `oms_parcels` ( `id` varchar(255) NOT NULL, `createdby` varchar(255) DEFAULT NULL, `createdat` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, `updatedat` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00', `updatedby` varchar(255) DEFAULT NULL, `account` varchar(255) DEFAULT NULL, `batch` varchar(255) DEFAULT NULL, `client` varchar(255) DEFAULT NULL, `command` varchar(255) DEFAULT NULL, `container` varchar(255) DEFAULT NULL, `items` mediumtext, `trackingnumber` varchar(255) NOT NULL, `transporter` varchar(255) DEFAULT NULL, `weight` decimal(19,2) NOT NULL, `zipcode` varchar(255) DEFAULT NULL, `ld3` varchar(255) DEFAULT NULL, `destination_code` varchar(255) DEFAULT NULL, PRIMARY KEY (`id`,`trackingnumber`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4","sqlType":null,"data":null,"old":null,"pkNames":null}
例2: { "action":"ALTER", "before":[], "bid":0, "data":[], "db":"db_test", "dbValType":{ "col1":"varchar(22)", "col2":"varchar(22)", "col_pk":"varchar(22)" }, "ddl":true, "entryType":"ROWDATA", "execTs":1669789188000, "jdbcType":{ "col1":12, "col2":12, "col_pk":12 }, "pks":[], "schema":"db_test", "sendTs":1669789189533, "sql":"alter table table_test add col2 varchar(22) null", "table":"table_test", "tableChanges":{ "table":{ "columns":[ { "jdbcType":12, // jdbc 类型。 "name":"col1", // 字段名称。 "position":0, // 字段的顺序。 "typeExpression":"varchar(22)", // 类型描述。 "typeName":"varchar" // 类型名称。 }, { "jdbcType":12, "name":"col2", "position":1, "typeExpression":"varchar(22)", "typeName":"varchar" }, { "jdbcType":12, "name":"col_pk", "position":2, "typeExpression":"varchar(22)", "typeName":"varchar" } ], "primaryKeyColumnNames":["col_pk"] // 主键名列表。 }, "type":"ALTER" } }