hi,
好的,我这面进行了尝试,将 data 的schema定义需要改成
ARRAY(ROW(FIELD("tracking_numbrer", STRING), FIELD("invoice_no", STRING)))
另外删除 .jsonSchema(...)后,程序数据解析已经没问题了;但是如果保留
.jsonSchema(...)的话会抛出如下异常信息:Exception in thread "main"
org.apache.flink.table.api.ValidationException: Type
ARRAY<ROW<`tracking_number` STRING, `invoice_no` STRING>> of table field
'data' does not match with the physical type ROW<`f0` ROW<`tracking_number`
STRING, `invoice_no` STRING>> of the 'data' field of the TableSource return
type.
而之所以保留这个jsonschema是因为我想尝试将这种复杂的json源的元数据保存到hive,进而通过这种方式推断出下面语句的格式,因为我不知道对于上述的复杂json在定义下面sql的时候字段信息怎么映射,或者说有这种复杂json的sql映射案例吗,感谢
[image: image.png]On Fri, Mar 20, 2020 at 11:42 AM Jark Wu <[email protected]> wrote: > Hi, > > 看了你的数据,"data" 是一个 array<row> 的类型,所以 data 的schema定义需要改成 > ARRAY(ROW(FIELD("tracking_numbrer", STRING), FIELD("invoice_no", STRING))) > 另外建议删除 .jsonSchema(...), 1.10 开始 flink-json 已经支持自动从 table schema 中推断 json > schema 了。 > > Best, > Jark > > On Fri, 20 Mar 2020 at 11:34, 宇张 <[email protected]> wrote: > > > hi: > > 1、在Json数据解析的时候,请问这里面为什么用的是decimal,而不是bigint > > [image: image.png] > > 2、我在使用connect的时候,发现解析Json数组元素出现异常,这是误用导致的还是一个bug > > > > > json:{"business":"riskt","data":[{"tracking_number":"0180024020920","invoice_no":"2020021025"}],"database":"installmentdb","table":"t_sales_order","ts":1581576074069,"type":"UPDATE","putRowNum":19999} > > > > > jsonSchema:{"type":"object","properties":{"business":{"type":"string"},"data":{"type":"array","items":[{"type":"object","properties":{"tracking_number":{"type":"string"},"invoice_no":{"type":"string"}}}]},"database":{"type":"string"},"table":{"type":"string"},"ts":{"type":"integer"},"type":{"type":"string"},"putRowNum":{"type":"integer"}}} > > connect: > > > > streamTableEnv > > .connect( > > new Kafka() > > .version("0.11") > > .topic("mysql_binlog_test_str") > > .startFromEarliest() > > .property("zookeeper.connect", "localhost:2181") > > .property("bootstrap.servers", "localhost:9092") > > ) > > .withFormat( > > new Json() > > > > .jsonSchema("{\"type\":\"object\",\"properties\":{\"business\":{\"type\":\"string\"},\"data\":{\"type\":\"array\",\"items\":[{\"type\":\"object\",\"properties\":{\"tracking_number\":{\"type\":\"string\"},\"invoice_no\":{\"type\":\"string\"}}}]},\"database\":{\"type\":\"string\"},\"table\":{\"type\":\"string\"},\"ts\":{\"type\":\"integer\"},\"type\":{\"type\":\"string\"},\"putRowNum\":{\"type\":\"integer\"}}}") > > ) > > .withSchema( > > new Schema() > > .field("business", DataTypes.STRING()) > > .field("data", > DataTypes.ROW(DataTypes.FIELD("f0", DataTypes.ROW( > > DataTypes.FIELD("tracking_number", > DataTypes.STRING()), > > DataTypes.FIELD("invoice_no", > DataTypes.STRING()))))) > > .field("database", DataTypes.STRING()) > > .field("table", DataTypes.STRING()) > > .field("ts", DataTypes.DECIMAL(38, 18)) > > .field("type", DataTypes.STRING()) > > .field("putRowNum", DataTypes.DECIMAL(38, 18)) > > ) > > .createTemporaryTable("Test"); > > > > 异常信息:Caused by: java.io.IOException: Failed to deserialize JSON object. > > > > at > > > org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:133) > > at > > > org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:76) > > at > > > org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:45) > > at > > > org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:146) > > at > > > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715) > > at > > > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) > > at > > > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) > > at > > > org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196) > > Caused by: java.lang.ClassCastException: > > > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode > > cannot be cast to > > > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode > > at > > > org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:411) > > at > > > org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236) > > at > > > org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439) > > at > > > org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418) > > at > > > org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236) > > at > > > org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:131) > > ... 7 more > > > > > > >
