Hi,

你发的图片都裂开了。。。 建议直接贴文本或者先上传到某个图床服务,然后将链接贴过来。

1. 使用 DECIMAL 抛什么错误呢?
2. 如果保留jsonSchema的话,要保证 table schema 和 json schema 是一致的,也就是不仅 table schema
要正确,json schema 也得要正确。
    这其实多了很多额外的成本,所以一般建议不配置 jsonSchema。理论上 table schema 能映射出所有的复杂的格式。

Best,
Jark


On Fri, 20 Mar 2020 at 14:48, 宇张 <[email protected]> wrote:

> hi、
> 好吧,测试发现Decimal用不了,即使是DECIMAL(38, 18),换成其他类型就好了,不知道是不是bug
> [image: image.png]
>
> On Fri, Mar 20, 2020 at 2:17 PM 宇张 <[email protected]> wrote:
>
>> hi,我这面再次进行了尝试,当json数据中有数字类型的时候,即使按照将 data 的schema定义需要改成
>> ARRAY(ROW(...))
>> 另外删除
>> .jsonSchema(...)后,程序仍然无法运行,当没有数字类型的时候是可以的;而报错信息输出来看,这两个结构是对的上的,但是貌似校验未通过
>> [image: image.png]
>>
>>
>> On Fri, Mar 20, 2020 at 12:08 PM 宇张 <[email protected]> wrote:
>>
>>> hi,
>>> 好的,我这面进行了尝试,将 data 的schema定义需要改成
>>> ARRAY(ROW(FIELD("tracking_numbrer", STRING), FIELD("invoice_no",
>>> STRING)))
>>> 另外删除 .jsonSchema(...)后,程序数据解析已经没问题了;但是如果保留
>>> .jsonSchema(...)的话会抛出如下异常信息:Exception in thread "main"
>>> org.apache.flink.table.api.ValidationException: Type
>>> ARRAY<ROW<`tracking_number` STRING, `invoice_no` STRING>> of table field
>>> 'data' does not match with the physical type ROW<`f0` ROW<`tracking_number`
>>> STRING, `invoice_no` STRING>> of the 'data' field of the TableSource return
>>> type.
>>>
>>> 而之所以保留这个jsonschema是因为我想尝试将这种复杂的json源的元数据保存到hive,进而通过这种方式推断出下面语句的格式,因为我不知道对于上述的复杂json在定义下面sql的时候字段信息怎么映射,或者说有这种复杂json的sql映射案例吗,感谢
>>> [image: image.png]
>>>
>>> On Fri, Mar 20, 2020 at 11:42 AM Jark Wu <[email protected]> wrote:
>>>
>>>> Hi,
>>>>
>>>> 看了你的数据,"data" 是一个 array<row> 的类型,所以 data 的schema定义需要改成
>>>> ARRAY(ROW(FIELD("tracking_numbrer", STRING), FIELD("invoice_no",
>>>> STRING)))
>>>> 另外建议删除 .jsonSchema(...), 1.10 开始 flink-json 已经支持自动从 table schema 中推断
>>>> json
>>>> schema 了。
>>>>
>>>> Best,
>>>> Jark
>>>>
>>>> On Fri, 20 Mar 2020 at 11:34, 宇张 <[email protected]> wrote:
>>>>
>>>> > hi:
>>>> > 1、在Json数据解析的时候,请问这里面为什么用的是decimal,而不是bigint
>>>> > [image: image.png]
>>>> > 2、我在使用connect的时候,发现解析Json数组元素出现异常,这是误用导致的还是一个bug
>>>> >
>>>> >
>>>> json:{"business":"riskt","data":[{"tracking_number":"0180024020920","invoice_no":"2020021025"}],"database":"installmentdb","table":"t_sales_order","ts":1581576074069,"type":"UPDATE","putRowNum":19999}
>>>> >
>>>> >
>>>> jsonSchema:{"type":"object","properties":{"business":{"type":"string"},"data":{"type":"array","items":[{"type":"object","properties":{"tracking_number":{"type":"string"},"invoice_no":{"type":"string"}}}]},"database":{"type":"string"},"table":{"type":"string"},"ts":{"type":"integer"},"type":{"type":"string"},"putRowNum":{"type":"integer"}}}
>>>> > connect:
>>>> >
>>>> > streamTableEnv
>>>> >         .connect(
>>>> >                 new Kafka()
>>>> >                         .version("0.11")
>>>> >                         .topic("mysql_binlog_test_str")
>>>> >                         .startFromEarliest()
>>>> >                         .property("zookeeper.connect",
>>>> "localhost:2181")
>>>> >                         .property("bootstrap.servers",
>>>> "localhost:9092")
>>>> >         )
>>>> >         .withFormat(
>>>> >                 new Json()
>>>> >
>>>>  
>>>> .jsonSchema("{\"type\":\"object\",\"properties\":{\"business\":{\"type\":\"string\"},\"data\":{\"type\":\"array\",\"items\":[{\"type\":\"object\",\"properties\":{\"tracking_number\":{\"type\":\"string\"},\"invoice_no\":{\"type\":\"string\"}}}]},\"database\":{\"type\":\"string\"},\"table\":{\"type\":\"string\"},\"ts\":{\"type\":\"integer\"},\"type\":{\"type\":\"string\"},\"putRowNum\":{\"type\":\"integer\"}}}")
>>>> >         )
>>>> >         .withSchema(
>>>> >                 new Schema()
>>>> >                         .field("business", DataTypes.STRING())
>>>> >                         .field("data",
>>>> DataTypes.ROW(DataTypes.FIELD("f0", DataTypes.ROW(
>>>> >                                 DataTypes.FIELD("tracking_number",
>>>> DataTypes.STRING()),
>>>> >                                 DataTypes.FIELD("invoice_no",
>>>> DataTypes.STRING())))))
>>>> >                         .field("database", DataTypes.STRING())
>>>> >                         .field("table", DataTypes.STRING())
>>>> >                         .field("ts", DataTypes.DECIMAL(38, 18))
>>>> >                         .field("type", DataTypes.STRING())
>>>> >                         .field("putRowNum", DataTypes.DECIMAL(38, 18))
>>>> >         )
>>>> >         .createTemporaryTable("Test");
>>>> >
>>>> > 异常信息:Caused by: java.io.IOException: Failed to deserialize JSON
>>>> object.
>>>> >
>>>> > at
>>>> >
>>>> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:133)
>>>> > at
>>>> >
>>>> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:76)
>>>> > at
>>>> >
>>>> org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:45)
>>>> > at
>>>> >
>>>> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:146)
>>>> > at
>>>> >
>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
>>>> > at
>>>> >
>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>>>> > at
>>>> >
>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>>>> > at
>>>> >
>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196)
>>>> > Caused by: java.lang.ClassCastException:
>>>> >
>>>> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode
>>>> > cannot be cast to
>>>> >
>>>> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode
>>>> > at
>>>> >
>>>> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:411)
>>>> > at
>>>> >
>>>> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
>>>> > at
>>>> >
>>>> org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439)
>>>> > at
>>>> >
>>>> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418)
>>>> > at
>>>> >
>>>> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
>>>> > at
>>>> >
>>>> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:131)
>>>> > ... 7 more
>>>> >
>>>> >
>>>> >
>>>>
>>>

回复