http://apache-flink.147419.n8.nabble.com/flink-1-10-sql-kafka-format-json-schema-json-object-tt4665.html
 这个邮件里提到了类似的问题。

https://issues.apache.org/jira/browse/FLINK-18002 这个issue完成后(1.12),你可以将
“data”,“mysqlType”等格式不确定的字段定义为String类型,
下游通过udf自己再解析对应的json


Best,
Godfrey

jindy_liu <[email protected]> 于2020年7月21日周二 下午12:37写道:

> 例如:
>
> mysql表:
> CREATE TABLE `test` (
>   `id` int(11) NOT NULL,
>   `name` varchar(255) NOT NULL,
>   `time` datetime NOT NULL,
>   `status` int(11) NOT NULL,
>   PRIMARY KEY (`id`)
> ) ENGINE=InnoDB DEFAULT CHARSET=utf8
>
> CREATE TABLE `status` (
>   `id` int(11) NOT NULL,
>   `name` varchar(255) NOT NULL,
>   PRIMARY KEY (`id`)
> ) ENGINE=InnoDB DEFAULT CHARSET=utf8
>
> kafka中数据:
> // 表test 中insert事件
> {"data":[{"id":"1745","name":"jindy1745","time":"2020-07-03
>
> 18:04:22","status":"0"}],"database":"ai_audio_lyric_task","es":1594968168000,"id":42,"isDdl":false,"mysqlType":{"id":"int(11)","name":"varchar(255)","time":"datetime","status":"int(11)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"time":93,"status":4},"table":"test","ts":1594968168789,"type":"INSERT"}
>
> //表status 中的事件
>
> {"data":[{"id":"10","name":"status"}],"database":"ai_audio_lyric_task","es":1595305259000,"id":589240,"isDdl":false,"mysqlType":{"id":"int(11)","name":"varchar(255)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12},"table":"status","ts":1595305259386,"type":"INSERT"}
>
> 如何由于kafka中的json动态的变化的,比如新增一个表,如何能转成应对的RowData,
> 感觉无法直接用JsonRowDeserializationSchema或CanalJsonDeserializationSchema来做处理。
>
>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>

回复