hi everyone,

zhangsan|man|28|[email protected]|{"math":98, "language":{"english":89, 
"french":95}}|china|beijing

这是一条来自kafka消息队列中的数据,当我创建kafka ddl为之定义schema时,报出异常信息:
Exception in thread "main" java.lang.IllegalArgumentException: Only simple 
types are supported in the second level nesting of fields 'alex_1' but was: 
ROW<`english` INT, `french` INT>
at 
org.apache.flink.formats.csv.CsvRowSchemaConverter.validateNestedField(CsvRowSchemaConverter.java:220)
at 
org.apache.flink.formats.csv.CsvRowSchemaConverter.convertType(CsvRowSchemaConverter.java:197)
at 
org.apache.flink.formats.csv.CsvRowSchemaConverter.convert(CsvRowSchemaConverter.java:145)
at 
org.apache.flink.formats.csv.CsvRowDataDeserializationSchema$Builder.<init>(CsvRowDataDeserializationSchema.java:98)
at 
org.apache.flink.formats.csv.CsvFormatFactory$1.createRuntimeDecoder(CsvFormatFactory.java:79)
at 
org.apache.flink.formats.csv.CsvFormatFactory$1.createRuntimeDecoder(CsvFormatFactory.java:71)
at 
org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource.createDeserialization(KafkaDynamicSource.java:401)
at 
org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource.getScanRuntimeProvider(KafkaDynamicSource.java:184)
at 
org.apache.flink.table.planner.sources.DynamicSourceUtils.validateScanSource(DynamicSourceUtils.java:262)
at 
org.apache.flink.table.planner.sources.DynamicSourceUtils.prepareDynamicSource(DynamicSourceUtils.java:73)
at 
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:101)
        ...
        ...

DDL语句:
CREATE TABLE kafka_source (
        name STRING,
sex STRING,
age INT,
mail STRING,
alex_1 ROW<math INT, `language` ROW<english INT, french INT>>,
country STRING,
city STRING
) WITH (
    'connector' = 'kafka',
    'topic' = 'topic5-r1p3',
    'properties.bootstrap.servers' = '10.1.128.63:9092',
    'properties.group.id' = 'group5',
    'format' = 'csv',
    'csv.field-delimiter' = '|',
    'scan.startup.mode' = 'group-offsets'
)

看起来似乎是ROW嵌套的问题,但是当把format改成json,却没有问题,想请问在csv中嵌套这样的json数据,schema该如何定义?

best,
amenhub


回复