Hi 你好, 这个取决于你使用的flink版本,1.11版本会自动从table schema中解析,而1.10版本如果table schema和json schema不是完全相同的话,需要手动写json-schema: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/json.html#data-type-mapping <https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/json.html#data-type-mapping> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#json-format <https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#json-format>
> 在 2020年12月4日,16:39,[email protected] 写道: > > 麻烦问下我已经在字段上面定了结构,还需要再写format.json-schema吗?CREATE TABLE user_log( > id VARCHAR, > timestam VARCHAR, > user_info ROW(user_id string, name string ), > jsonArray ARRAY<ROW(user_id222 STRING, name222 STRING)> > ) WITH ( > 'connector.type' = 'kafka', > 'connector.version' = 'universal', > 'connector.topic' = 'complex_string', > 'connector.properties.zookeeper.connect' = 'venn:2181', > 'connector.properties.bootstrap.servers' = 'venn:9092', > 'connector.startup-mode' = 'earliest-offset', > 'format.type' = 'json', > 'format.json-schema' = '{ > "type": "object", > "properties": { > "id": {type: "string"}, > "timestam": {type: "string"}, > "user_info":{type: "object", > "properties" : { > "user_id" : {type:"string"}, > "name":{type:"string"} > } > }, > "jsonArray":{"type": "array", > "items": { > "type": "object", > "properties" : { > "user_id222" : {type:"string"}, > "name222" : {type:"string"} > } > } > } > } > }' > ); > > > > > [email protected]
