hi everyone,
zhangsan|man|28|[email protected]|{"math":98, "language":{"english":89,
"french":95}}|china|beijing
这是一条来自kafka消息队列中的数据,当我创建kafka ddl为之定义schema时,报出异常信息:
Exception in thread "main" java.lang.IllegalArgumentException: Only simple
types are supported in the second level nesting of fields 'alex_1' but was:
ROW<`english` INT, `french` INT>
at
org.apache.flink.formats.csv.CsvRowSchemaConverter.validateNestedField(CsvRowSchemaConverter.java:220)
at
org.apache.flink.formats.csv.CsvRowSchemaConverter.convertType(CsvRowSchemaConverter.java:197)
at
org.apache.flink.formats.csv.CsvRowSchemaConverter.convert(CsvRowSchemaConverter.java:145)
at
org.apache.flink.formats.csv.CsvRowDataDeserializationSchema$Builder.<init>(CsvRowDataDeserializationSchema.java:98)
at
org.apache.flink.formats.csv.CsvFormatFactory$1.createRuntimeDecoder(CsvFormatFactory.java:79)
at
org.apache.flink.formats.csv.CsvFormatFactory$1.createRuntimeDecoder(CsvFormatFactory.java:71)
at
org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource.createDeserialization(KafkaDynamicSource.java:401)
at
org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource.getScanRuntimeProvider(KafkaDynamicSource.java:184)
at
org.apache.flink.table.planner.sources.DynamicSourceUtils.validateScanSource(DynamicSourceUtils.java:262)
at
org.apache.flink.table.planner.sources.DynamicSourceUtils.prepareDynamicSource(DynamicSourceUtils.java:73)
at
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:101)
...
...
DDL语句:
CREATE TABLE kafka_source (
name STRING,
sex STRING,
age INT,
mail STRING,
alex_1 ROW<math INT, `language` ROW<english INT, french INT>>,
country STRING,
city STRING
) WITH (
'connector' = 'kafka',
'topic' = 'topic5-r1p3',
'properties.bootstrap.servers' = '10.1.128.63:9092',
'properties.group.id' = 'group5',
'format' = 'csv',
'csv.field-delimiter' = '|',
'scan.startup.mode' = 'group-offsets'
)
看起来似乎是ROW嵌套的问题,但是当把format改成json,却没有问题,想请问在csv中嵌套这样的json数据,schema该如何定义?
best,
amenhub