Hi,
按照提示修改了,还是报错的:
Query:
val streamExecutionEnv =
StreamExecutionEnvironment.getExecutionEnvironment
streamExecutionEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
streamExecutionEnv.setStateBackend(new
RocksDBStateBackend("hdfs://nameservice1/flink/checkpoints"))
val blinkEnvSettings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val streamTableEnv = StreamTableEnvironment.create(streamExecutionEnv,
blinkEnvSettings)
streamTableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,CheckpointingMode.EXACTLY_ONCE)
streamTableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,Duration.ofSeconds(20))
streamTableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT,Duration.ofSeconds(900))
streamTableEnv.executeSql(
"""
|
|CREATE TABLE kafka_table (
| uid BIGINT,
| sex VARCHAR,
| age INT,
| created_time TIMESTAMP(3),
| procTime AS PROCTIME(),
| WATERMARK FOR created_time as created_time - INTERVAL '3' SECOND
|) WITH (
| 'connector' = 'kafka',
| 'topic' = 'user',
| 'properties.bootstrap.servers' = 'cdh1:9092,cdh2:9092,cdh3:9092',
| 'properties.group.id' = 'user_flink',
| 'scan.startup.mode' = 'latest-offset',
| 'format' = 'json',
| 'json.fail-on-missing-field' = 'false',
| 'json.ignore-parse-errors' = 'true',
| 'json.timestamp-format.standard' = 'ISO-8601'
|)
|""".stripMargin)
streamTableEnv.executeSql(
"""
|
|CREATE TABLE print_table
|(
| uid BIGINT,
| sex VARCHAR,
| age INT,
| created_time TIMESTAMP(3)
|)
|WITH ('connector' = 'print')
|
|
|""".stripMargin)
streamTableEnv.executeSql(
"""
|insert into print_table
|SELECT
| uid,sex,age,created_time
|FROM kafka_table
|
|""".stripMargin)
堆栈:
2020-07-2410:33:32,852INFO
org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser [] -
Kafka startTimeMs: 1595558012852
2020-07-2410:33:32,853INFO
org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer
[] - [Consumer clientId=consumer-user_flink-12, groupId=user_flink] Subscribed
to partition(s): user-0
2020-07-2410:33:32,853INFO
org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer
[] - [Consumer clientId=consumer-user_flink-12, groupId=user_flink] Seeking to
offset 36627for partition user-0
2020-07-2410:33:32,860INFO
org.apache.flink.kafka.shaded.org.apache.kafka.clients.Metadata [] - [Consumer
clientId=consumer-user_flink-12, groupId=user_flink] ClusterID:
cAT_xBISQNWghT9kR5UuIw
2020-07-2410:33:32,871WARN org.apache.flink.runtime.taskmanager.Task
[] - Source: TableSourceScan(table=[[default_catalog,
default_database, kafka_table]], fields=[uid, sex, age, created_time]) ->
Calc(select=[uid, sex, age, created_time, () AS procTime]) ->
WatermarkAssigner(rowtime=[created_time], watermark=[(created_time -
3000:INTERVALSECOND)]) -> Calc(select=[uid, sex, age, created_time]) -> Sink:
Sink(table=[default_catalog.default_database.print_table], fields=[uid, sex,
age, created_time]) (2/4) (6b585139c083982beb6997e1ae2041ed) switched
fromRUNNING to FAILED.
java.lang.RuntimeException: RowTime field should not be null, please convert it
to a non-nulllong value.
at
org.apache.flink.table.runtime.operators.wmassigners.WatermarkAssignerOperator.processElement(WatermarkAssignerOperator.java:115)
~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
在 2020-07-23 21:23:28,"Leonard Xu" <[email protected]> 写道:
>Hi
>
>这是1.11里的一个 json format t的不兼容改动[1],目的是支持更多的 timestamp format
>的解析,你可以把json-timestamp-format-standard
><https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/json.html#json-timestamp-format-standard>设置成
> “ISO-8601”,应该就不用改动了。
>
>
>Best
>Leonard Xu
>[1]
>https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/json.html#json-timestamp-format-standard
>
><https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/json.html#json-timestamp-format-standard>
>
>> 在 2020年7月23日,20:54,Zhou Zach <[email protected]> 写道:
>>
>> 当前作业有个sink
>> connector消费不到数据,我找到原因了,根本原因是kafka中时间字段的问题,只是with子句新旧参数对相同的字段数据表现了不同的行为,kafka中的消息格式:
>>
>>
>> {"uid":46,"sex":"female","age":11,"created_time":"2020-07-23T19:53:15.509Z"}
>> 奇怪的是,在kafka_table DDL中,created_time
>> 定义为TIMESTAMP(3),with使用老参数是可以成功运行的,with使用新参数,在IDEA中运行没有任何异常,提交到yarn上,会报异常:
>> java.lang.RuntimeException: RowTime field should not be null, please convert
>> it to a non-nulllong value.
>> at
>> org.apache.flink.table.runtime.operators.wmassigners.WatermarkAssignerOperator.processElement(WatermarkAssignerOperator.java:115)
>> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>>
>>
>> 在本地用如下函数测试,结果确实是NULL
>> TO_TIMESTAMP('2020-07-23T19:53:15.509Z')
>> kafka producuer将created_time字段设置为整型,或者 “2020-07-23
>> 20:36:55.565”,with使用新参数是没有问题的。调了一下午,调到怀疑人生,还好发现问题
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2020-07-23 20:10:43,"Leonard Xu" <[email protected]> 写道:
>>> Hi
>>>
>>> 你说的下游消费不到数据,这个下游是指当前作业消费不到数据吗?
>>>
>>> 正常应该不会的,可以提供个可复现代码吗?
>>>
>>> 祝好
>>> Leonard Xu
>>>
>>>
>>>> 在 2020年7月23日,18:13,Zhou Zach <[email protected]> 写道:
>>>>
>>>> Hi all,
>>>>
>>>> 根据文档https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html#start-reading-position,
>>>> 使用新参数创建kafka_table,下游消费不到数据,使用老参数下游可以消费到数据,是不是新参数的方式有坑啊
>>>>
>>>>
>>>> 老参数:
>>>> streamTableEnv.executeSql(
>>>> """
>>>> |
>>>> |CREATE TABLE kafka_table (
>>>> | uid BIGINT,
>>>> | sex VARCHAR,
>>>> | age INT,
>>>> | created_time TIMESTAMP(3),
>>>> | WATERMARK FOR created_time as created_time - INTERVAL '3' SECOND
>>>> |) WITH (
>>>> |
>>>> | 'connector.type' = 'kafka',
>>>> | 'connector.version' = 'universal',
>>>> | 'connector.topic' = 'user',
>>>> | 'connector.startup-mode' = 'latest-offset',
>>>> | 'connector.properties.zookeeper.connect' =
>>>> 'cdh1:2181,cdh2:2181,cdh3:2181',
>>>> | 'connector.properties.bootstrap.servers' =
>>>> 'cdh1:9092,cdh2:9092,cdh3:9092',
>>>> | 'connector.properties.group.id' = 'user_flink',
>>>> | 'format.type' = 'json',
>>>> | 'format.derive-schema' = 'true'
>>>> |
>>>> |)
>>>> |""".stripMargin)
>>>>
>>>> 新参数:
>>>>
>>>> streamTableEnv.executeSql(
>>>> """
>>>> |
>>>> |CREATE TABLE kafka_table (
>>>> |
>>>> | uid BIGINT,
>>>> | sex VARCHAR,
>>>> | age INT,
>>>> | created_time TIMESTAMP(3),
>>>> | WATERMARK FOR created_time as created_time - INTERVAL '3' SECOND
>>>> |) WITH (
>>>> | 'connector' = 'kafka',
>>>> | 'topic' = 'user',
>>>> | 'properties.bootstrap.servers' =
>>>> 'cdh1:9092,cdh2:9092,cdh3:9092',
>>>> | 'properties.group.id' = 'user_flink',
>>>> | 'scan.startup.mode' = 'latest-offset',
>>>> | 'format' = 'json',
>>>> | 'json.fail-on-missing-field' = 'false',
>>>> | 'json.ignore-parse-errors' = 'true'
>>>> |)
>>>> |""".stripMargin)
>