http://apache-flink.147419.n8.nabble.com/flink-1-10-sql-kafka-format-json-schema-json-object-tt4665.html 这个邮件里提到了类似的问题。
https://issues.apache.org/jira/browse/FLINK-18002 这个issue完成后(1.12),你可以将 “data”,“mysqlType”等格式不确定的字段定义为String类型, 下游通过udf自己再解析对应的json Best, Godfrey jindy_liu <[email protected]> 于2020年7月21日周二 下午12:37写道: > 例如: > > mysql表: > CREATE TABLE `test` ( > `id` int(11) NOT NULL, > `name` varchar(255) NOT NULL, > `time` datetime NOT NULL, > `status` int(11) NOT NULL, > PRIMARY KEY (`id`) > ) ENGINE=InnoDB DEFAULT CHARSET=utf8 > > CREATE TABLE `status` ( > `id` int(11) NOT NULL, > `name` varchar(255) NOT NULL, > PRIMARY KEY (`id`) > ) ENGINE=InnoDB DEFAULT CHARSET=utf8 > > kafka中数据: > // 表test 中insert事件 > {"data":[{"id":"1745","name":"jindy1745","time":"2020-07-03 > > 18:04:22","status":"0"}],"database":"ai_audio_lyric_task","es":1594968168000,"id":42,"isDdl":false,"mysqlType":{"id":"int(11)","name":"varchar(255)","time":"datetime","status":"int(11)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"time":93,"status":4},"table":"test","ts":1594968168789,"type":"INSERT"} > > //表status 中的事件 > > {"data":[{"id":"10","name":"status"}],"database":"ai_audio_lyric_task","es":1595305259000,"id":589240,"isDdl":false,"mysqlType":{"id":"int(11)","name":"varchar(255)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12},"table":"status","ts":1595305259386,"type":"INSERT"} > > 如何由于kafka中的json动态的变化的,比如新增一个表,如何能转成应对的RowData, > 感觉无法直接用JsonRowDeserializationSchema或CanalJsonDeserializationSchema来做处理。 > > > > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ >
