不同云厂商的数据同步工具对于全量+增量同步mysql数据到kafka canal json格式时行为不一致
有的厂商会将DDL语句同步到topic导致下游flink sql作业不能识别抛错,建议flink canal json格式解析时直接忽略不识别的type,例如
例1:
{"jobId":"76d140e3-2ef7-40c2-a795-af35a0fe1d61","shardId":null,"identifier":null,"eventId":"","mysqlType":null,"id":0,"es":1675908021700,"ts":1675908021700,"database":"demo","table":"oms_parcels","type":"INIT_DDL","isDdl":true,"sql":"CREATE
 TABLE `oms_parcels` (  `id` varchar(255) NOT NULL,  `createdby` varchar(255) 
DEFAULT NULL,  `createdat` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON 
UPDATE CURRENT_TIMESTAMP,  `updatedat` timestamp NOT NULL DEFAULT '0000-00-00 
00:00:00',  `updatedby` varchar(255) DEFAULT NULL,  `account` varchar(255) 
DEFAULT NULL,  `batch` varchar(255) DEFAULT NULL,  `client` varchar(255) 
DEFAULT NULL,  `command` varchar(255) DEFAULT NULL,  `container` varchar(255) 
DEFAULT NULL,  `items` mediumtext,  `trackingnumber` varchar(255) NOT NULL,  
`transporter` varchar(255) DEFAULT NULL,  `weight` decimal(19,2) NOT NULL,  
`zipcode` varchar(255) DEFAULT NULL,  `ld3` varchar(255) DEFAULT NULL,  
`destination_code` varchar(255) DEFAULT NULL,  PRIMARY KEY 
(`id`,`trackingnumber`)) ENGINE=InnoDB DEFAULT 
CHARSET=utf8mb4","sqlType":null,"data":null,"old":null,"pkNames":null}


例2:
{
    "action":"ALTER",
    "before":[],
    "bid":0,
    "data":[],
    "db":"db_test",
    "dbValType":{
        "col1":"varchar(22)",
        "col2":"varchar(22)",
        "col_pk":"varchar(22)"
    },
    "ddl":true,
    "entryType":"ROWDATA",
    "execTs":1669789188000,
    "jdbcType":{
        "col1":12,
        "col2":12,
        "col_pk":12
    },
    "pks":[],
    "schema":"db_test",
    "sendTs":1669789189533,
    "sql":"alter table table_test add col2 varchar(22) null",
    "table":"table_test",
    "tableChanges":{
        "table":{
            "columns":[
                {
                    "jdbcType":12, // jdbc 类型。
                    "name":"col1",    // 字段名称。
                    "position":0,  // 字段的顺序。
                    "typeExpression":"varchar(22)", // 类型描述。
                    "typeName":"varchar" // 类型名称。
                },
                {
                    "jdbcType":12,
                    "name":"col2",
                    "position":1, 
                    "typeExpression":"varchar(22)", 
                    "typeName":"varchar" 
                },
                {
                    "jdbcType":12, 
                    "name":"col_pk",   
                    "position":2,  
                    "typeExpression":"varchar(22)", 
                    "typeName":"varchar" 
                }
            ],
            "primaryKeyColumnNames":["col_pk"] // 主键名列表。
        },
        "type":"ALTER"
    }
}

回复