Hi

这是1.11里的一个 json format t的不兼容改动[1],目的是支持更多的 timestamp format 
的解析,你可以把json-timestamp-format-standard 
<https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/json.html#json-timestamp-format-standard>设置成
 “ISO-8601”,应该就不用改动了。


Best
Leonard Xu
[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/json.html#json-timestamp-format-standard
 
<https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/json.html#json-timestamp-format-standard>

> 在 2020年7月23日,20:54,Zhou Zach <[email protected]> 写道:
> 
> 当前作业有个sink 
> connector消费不到数据,我找到原因了,根本原因是kafka中时间字段的问题,只是with子句新旧参数对相同的字段数据表现了不同的行为,kafka中的消息格式:
> 
> 
> {"uid":46,"sex":"female","age":11,"created_time":"2020-07-23T19:53:15.509Z"}
> 奇怪的是,在kafka_table DDL中,created_time 
> 定义为TIMESTAMP(3),with使用老参数是可以成功运行的,with使用新参数,在IDEA中运行没有任何异常,提交到yarn上,会报异常:
> java.lang.RuntimeException: RowTime field should not be null, please convert 
> it to a non-nulllong value.
>    at 
> org.apache.flink.table.runtime.operators.wmassigners.WatermarkAssignerOperator.processElement(WatermarkAssignerOperator.java:115)
>  ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> 
> 
> 在本地用如下函数测试,结果确实是NULL
> TO_TIMESTAMP('2020-07-23T19:53:15.509Z')
> kafka producuer将created_time字段设置为整型,或者 “2020-07-23 
> 20:36:55.565”,with使用新参数是没有问题的。调了一下午,调到怀疑人生,还好发现问题
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 在 2020-07-23 20:10:43,"Leonard Xu" <[email protected]> 写道:
>> Hi
>> 
>> 你说的下游消费不到数据,这个下游是指当前作业消费不到数据吗?
>> 
>> 正常应该不会的,可以提供个可复现代码吗? 
>> 
>> 祝好
>> Leonard Xu
>> 
>> 
>>> 在 2020年7月23日,18:13,Zhou Zach <[email protected]> 写道:
>>> 
>>> Hi all,
>>> 
>>> 根据文档https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html#start-reading-position,
>>> 使用新参数创建kafka_table,下游消费不到数据,使用老参数下游可以消费到数据,是不是新参数的方式有坑啊
>>> 
>>> 
>>> 老参数:
>>>   streamTableEnv.executeSql(
>>>     """
>>>       |
>>>       |CREATE TABLE kafka_table (
>>>       |    uid BIGINT,
>>>       |    sex VARCHAR,
>>>       |    age INT,
>>>       |    created_time TIMESTAMP(3),
>>>       |    WATERMARK FOR created_time as created_time - INTERVAL '3' SECOND
>>>       |) WITH (
>>>       |
>>>       |     'connector.type' = 'kafka',
>>>       |    'connector.version' = 'universal',
>>>       |    'connector.topic' = 'user',
>>>       |    'connector.startup-mode' = 'latest-offset',
>>>       |    'connector.properties.zookeeper.connect' = 
>>> 'cdh1:2181,cdh2:2181,cdh3:2181',
>>>       |    'connector.properties.bootstrap.servers' = 
>>> 'cdh1:9092,cdh2:9092,cdh3:9092',
>>>       |    'connector.properties.group.id' = 'user_flink',
>>>       |    'format.type' = 'json',
>>>       |    'format.derive-schema' = 'true'
>>>       |
>>>       |)
>>>       |""".stripMargin)
>>> 
>>> 新参数:
>>> 
>>>   streamTableEnv.executeSql(
>>>     """
>>>       |
>>>       |CREATE TABLE kafka_table (
>>>       |
>>>       |    uid BIGINT,
>>>       |    sex VARCHAR,
>>>       |    age INT,
>>>       |    created_time TIMESTAMP(3),
>>>       |    WATERMARK FOR created_time as created_time - INTERVAL '3' SECOND
>>>       |) WITH (
>>>       |    'connector' = 'kafka',
>>>       |     'topic' = 'user',
>>>       |    'properties.bootstrap.servers' = 'cdh1:9092,cdh2:9092,cdh3:9092',
>>>       |    'properties.group.id' = 'user_flink',
>>>       |    'scan.startup.mode' = 'latest-offset',
>>>       |    'format' = 'json',
>>>       |    'json.fail-on-missing-field' = 'false',
>>>       |    'json.ignore-parse-errors' = 'true'
>>>       |)
>>>       |""".stripMargin)

回复