hi,大家好,我遇到一个问题。

下游系统发过来的数据是json数组,比如[{"name": "daluo", "age": 1}, {"name": "xiaoming",
"age": 2}],我想使用'connector.type' = 'kafka' 阅读此类数据,应该如何写如下的sql?

CREATE TABLE mykafka1 (name String, age Int) 
WITH (
   'connector.type' = 'kafka',
   'format.type' = 'json',
   'update-mode' = 'append'
);

还是说,先使用原生的FlinkKafkaConsumer读取变成DataStream<List&lt;data>>,再转换flatMap转换成DataStream<data>,再使用tableEnv.fromDataStream把它变成tableSource?




--
Sent from: http://apache-flink.147419.n8.nabble.com/

回复