Hi 你说的下游消费不到数据,这个下游是指当前作业消费不到数据吗?
正常应该不会的,可以提供个可复现代码吗? 祝好 Leonard Xu > 在 2020年7月23日,18:13,Zhou Zach <[email protected]> 写道: > > Hi all, > > 根据文档https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html#start-reading-position, > 使用新参数创建kafka_table,下游消费不到数据,使用老参数下游可以消费到数据,是不是新参数的方式有坑啊 > > > 老参数: > streamTableEnv.executeSql( > """ > | > |CREATE TABLE kafka_table ( > | uid BIGINT, > | sex VARCHAR, > | age INT, > | created_time TIMESTAMP(3), > | WATERMARK FOR created_time as created_time - INTERVAL '3' SECOND > |) WITH ( > | > | 'connector.type' = 'kafka', > | 'connector.version' = 'universal', > | 'connector.topic' = 'user', > | 'connector.startup-mode' = 'latest-offset', > | 'connector.properties.zookeeper.connect' = > 'cdh1:2181,cdh2:2181,cdh3:2181', > | 'connector.properties.bootstrap.servers' = > 'cdh1:9092,cdh2:9092,cdh3:9092', > | 'connector.properties.group.id' = 'user_flink', > | 'format.type' = 'json', > | 'format.derive-schema' = 'true' > | > |) > |""".stripMargin) > > 新参数: > > streamTableEnv.executeSql( > """ > | > |CREATE TABLE kafka_table ( > | > | uid BIGINT, > | sex VARCHAR, > | age INT, > | created_time TIMESTAMP(3), > | WATERMARK FOR created_time as created_time - INTERVAL '3' SECOND > |) WITH ( > | 'connector' = 'kafka', > | 'topic' = 'user', > | 'properties.bootstrap.servers' = 'cdh1:9092,cdh2:9092,cdh3:9092', > | 'properties.group.id' = 'user_flink', > | 'scan.startup.mode' = 'latest-offset', > | 'format' = 'json', > | 'json.fail-on-missing-field' = 'false', > | 'json.ignore-parse-errors' = 'true' > |) > |""".stripMargin)
