Hi 你好,

这个取决于你使用的flink版本,1.11版本会自动从table schema中解析,而1.10版本如果table schema和json 
schema不是完全相同的话,需要手动写json-schema:
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/json.html#data-type-mapping
 
<https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/json.html#data-type-mapping>
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#json-format
 
<https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#json-format>


> 在 2020年12月4日,16:39,[email protected] 写道:
> 
> 麻烦问下我已经在字段上面定了结构,还需要再写format.json-schema吗?CREATE TABLE user_log(
>    id VARCHAR,
>    timestam VARCHAR,
>    user_info ROW(user_id string, name string ),
>    jsonArray ARRAY<ROW(user_id222 STRING, name222 STRING)>
> ) WITH (
>    'connector.type' = 'kafka',
>    'connector.version' = 'universal',
>    'connector.topic' = 'complex_string',
>    'connector.properties.zookeeper.connect' = 'venn:2181',
>    'connector.properties.bootstrap.servers' = 'venn:9092',
>    'connector.startup-mode' = 'earliest-offset',
>    'format.type' = 'json',
>    'format.json-schema' = '{
>        "type": "object",
>        "properties": {
>           "id": {type: "string"},
>           "timestam": {type: "string"},
>           "user_info":{type: "object",
>                   "properties" : {
>                       "user_id" : {type:"string"},
>                       "name":{type:"string"}
>                   }
>             },
>            "jsonArray":{"type": "array",
>                         "items": {
>                                  "type": "object",
>                                  "properties" : {
>                                      "user_id222" : {type:"string"},
>                                      "name222" : {type:"string"}
>                                     }
>                                  }
>                         }
>        }
>    }'
> );
> 
> 
> 
> 
> [email protected]

回复