Hi all,
根据文档https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html#start-reading-position,
使用新参数创建kafka_table,下游消费不到数据,使用老参数下游可以消费到数据,是不是新参数的方式有坑啊
老参数:
streamTableEnv.executeSql(
"""
|
|CREATE TABLE kafka_table (
| uid BIGINT,
| sex VARCHAR,
| age INT,
| created_time TIMESTAMP(3),
| WATERMARK FOR created_time as created_time - INTERVAL '3' SECOND
|) WITH (
|
| 'connector.type' = 'kafka',
| 'connector.version' = 'universal',
| 'connector.topic' = 'user',
| 'connector.startup-mode' = 'latest-offset',
| 'connector.properties.zookeeper.connect' =
'cdh1:2181,cdh2:2181,cdh3:2181',
| 'connector.properties.bootstrap.servers' =
'cdh1:9092,cdh2:9092,cdh3:9092',
| 'connector.properties.group.id' = 'user_flink',
| 'format.type' = 'json',
| 'format.derive-schema' = 'true'
|
|)
|""".stripMargin)
新参数:
streamTableEnv.executeSql(
"""
|
|CREATE TABLE kafka_table (
|
| uid BIGINT,
| sex VARCHAR,
| age INT,
| created_time TIMESTAMP(3),
| WATERMARK FOR created_time as created_time - INTERVAL '3' SECOND
|) WITH (
| 'connector' = 'kafka',
| 'topic' = 'user',
| 'properties.bootstrap.servers' = 'cdh1:9092,cdh2:9092,cdh3:9092',
| 'properties.group.id' = 'user_flink',
| 'scan.startup.mode' = 'latest-offset',
| 'format' = 'json',
| 'json.fail-on-missing-field' = 'false',
| 'json.ignore-parse-errors' = 'true'
|)
|""".stripMargin)