Hi

你说的下游消费不到数据,这个下游是指当前作业消费不到数据吗?

正常应该不会的,可以提供个可复现代码吗? 

祝好
Leonard Xu


> 在 2020年7月23日,18:13,Zhou Zach <[email protected]> 写道:
> 
> Hi all,
> 
> 根据文档https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html#start-reading-position,
> 使用新参数创建kafka_table,下游消费不到数据,使用老参数下游可以消费到数据,是不是新参数的方式有坑啊
> 
> 
> 老参数:
>    streamTableEnv.executeSql(
>      """
>        |
>        |CREATE TABLE kafka_table (
>        |    uid BIGINT,
>        |    sex VARCHAR,
>        |    age INT,
>        |    created_time TIMESTAMP(3),
>        |    WATERMARK FOR created_time as created_time - INTERVAL '3' SECOND
>        |) WITH (
>        |
>        |     'connector.type' = 'kafka',
>        |    'connector.version' = 'universal',
>        |    'connector.topic' = 'user',
>        |    'connector.startup-mode' = 'latest-offset',
>        |    'connector.properties.zookeeper.connect' = 
> 'cdh1:2181,cdh2:2181,cdh3:2181',
>        |    'connector.properties.bootstrap.servers' = 
> 'cdh1:9092,cdh2:9092,cdh3:9092',
>        |    'connector.properties.group.id' = 'user_flink',
>        |    'format.type' = 'json',
>        |    'format.derive-schema' = 'true'
>        |
>        |)
>        |""".stripMargin)
> 
> 新参数:
> 
>    streamTableEnv.executeSql(
>      """
>        |
>        |CREATE TABLE kafka_table (
>        |
>        |    uid BIGINT,
>        |    sex VARCHAR,
>        |    age INT,
>        |    created_time TIMESTAMP(3),
>        |    WATERMARK FOR created_time as created_time - INTERVAL '3' SECOND
>        |) WITH (
>        |    'connector' = 'kafka',
>        |     'topic' = 'user',
>        |    'properties.bootstrap.servers' = 'cdh1:9092,cdh2:9092,cdh3:9092',
>        |    'properties.group.id' = 'user_flink',
>        |    'scan.startup.mode' = 'latest-offset',
>        |    'format' = 'json',
>        |    'json.fail-on-missing-field' = 'false',
>        |    'json.ignore-parse-errors' = 'true'
>        |)
>        |""".stripMargin)

回复