[jira] [Commented] (FLINK-25336) Kafka connector compatible problem in Flink sql
[ https://issues.apache.org/jira/browse/FLINK-25336?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17597186#comment-17597186 ] Martijn Visser commented on FLINK-25336: [~sunnny] Yes > Kafka connector compatible problem in Flink sql > --- > > Key: FLINK-25336 > URL: https://issues.apache.org/jira/browse/FLINK-25336 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.14.0 > Environment: Flink 1.14.0 > Kafka 0.10.2.1 >Reporter: Yuan Zhu >Priority: Minor > Labels: Flink-sql, Kafka, flink > Attachments: log.jpg > > > When I use sql to query kafka table, like > {code:java} > create table `kfk` > ( > user_id VARCHAR > ) with ( > 'connector' = 'kafka', > 'topic' = 'test', > 'properties.bootstrap.servers' = 'localhost:9092', > 'format' = 'json', > 'scan.startup.mode' = 'timestamp', > 'scan.startup.timestamp-millis' = '163941120', > 'properties.group.id' = 'test' > ); > CREATE TABLE print_table (user_id varchar) WITH ('connector' = 'print'); > insert into print_table select user_id from kfk;{code} > It will encounter an exception: > org.apache.kafka.common.errors.UnsupportedVersionException: MetadataRequest > versions older than 4 don't support the allowAutoTopicCreation field !log.jpg! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-25336) Kafka connector compatible problem in Flink sql
[ https://issues.apache.org/jira/browse/FLINK-25336?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17585169#comment-17585169 ] zhilinli commented on FLINK-25336: -- Is the ultimate solution only to upgrade Kafkfa? > Kafka connector compatible problem in Flink sql > --- > > Key: FLINK-25336 > URL: https://issues.apache.org/jira/browse/FLINK-25336 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.14.0 > Environment: Flink 1.14.0 > Kafka 0.10.2.1 >Reporter: Yuan Zhu >Priority: Minor > Labels: Flink-sql, Kafka, flink > Attachments: log.jpg > > > When I use sql to query kafka table, like > {code:java} > create table `kfk` > ( > user_id VARCHAR > ) with ( > 'connector' = 'kafka', > 'topic' = 'test', > 'properties.bootstrap.servers' = 'localhost:9092', > 'format' = 'json', > 'scan.startup.mode' = 'timestamp', > 'scan.startup.timestamp-millis' = '163941120', > 'properties.group.id' = 'test' > ); > CREATE TABLE print_table (user_id varchar) WITH ('connector' = 'print'); > insert into print_table select user_id from kfk;{code} > It will encounter an exception: > org.apache.kafka.common.errors.UnsupportedVersionException: MetadataRequest > versions older than 4 don't support the allowAutoTopicCreation field !log.jpg! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-25336) Kafka connector compatible problem in Flink sql
[ https://issues.apache.org/jira/browse/FLINK-25336?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17460562#comment-17460562 ] Yuan Zhu commented on FLINK-25336: -- [~MartijnVisser] [~JasonLee] Thanks for your help. I get it. > Kafka connector compatible problem in Flink sql > --- > > Key: FLINK-25336 > URL: https://issues.apache.org/jira/browse/FLINK-25336 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.14.0 > Environment: Flink 1.14.0 > Kafka 0.10.2.1 >Reporter: Yuan Zhu >Priority: Minor > Labels: Flink-sql, Kafka, flink > Attachments: log.jpg > > > When I use sql to query kafka table, like > {code:java} > create table `kfk` > ( > user_id VARCHAR > ) with ( > 'connector' = 'kafka', > 'topic' = 'test', > 'properties.bootstrap.servers' = 'localhost:9092', > 'format' = 'json', > 'scan.startup.mode' = 'timestamp', > 'scan.startup.timestamp-millis' = '163941120', > 'properties.group.id' = 'test' > ); > CREATE TABLE print_table (user_id varchar) WITH ('connector' = 'print'); > insert into print_table select user_id from kfk;{code} > It will encounter an exception: > org.apache.kafka.common.errors.UnsupportedVersionException: MetadataRequest > versions older than 4 don't support the allowAutoTopicCreation field !log.jpg! -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-25336) Kafka connector compatible problem in Flink sql
[ https://issues.apache.org/jira/browse/FLINK-25336?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17460512#comment-17460512 ] Martijn Visser commented on FLINK-25336: [~straw] Thanks for pointing that out, the documentation is not correct (anymore). Flink 1.14 ships with support for the Kafka Client 2.4.1, see https://github.com/apache/flink/blob/release-1.14/flink-connectors/flink-connector-kafka/pom.xml#L39 Flink 1.15 will ship with support for Kafka Client 2.8, see https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/pom.xml#L39 > Kafka connector compatible problem in Flink sql > --- > > Key: FLINK-25336 > URL: https://issues.apache.org/jira/browse/FLINK-25336 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.14.0 > Environment: Flink 1.14.0 > Kafka 0.10.2.1 >Reporter: Yuan Zhu >Priority: Minor > Labels: Flink-sql, Kafka, flink > Attachments: log.jpg > > > When I use sql to query kafka table, like > {code:java} > create table `kfk` > ( > user_id VARCHAR > ) with ( > 'connector' = 'kafka', > 'topic' = 'test', > 'properties.bootstrap.servers' = 'localhost:9092', > 'format' = 'json', > 'scan.startup.mode' = 'timestamp', > 'scan.startup.timestamp-millis' = '163941120', > 'properties.group.id' = 'test' > ); > CREATE TABLE print_table (user_id varchar) WITH ('connector' = 'print'); > insert into print_table select user_id from kfk;{code} > It will encounter an exception: > org.apache.kafka.common.errors.UnsupportedVersionException: MetadataRequest > versions older than 4 don't support the allowAutoTopicCreation field !log.jpg! -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-25336) Kafka connector compatible problem in Flink sql
[ https://issues.apache.org/jira/browse/FLINK-25336?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17460493#comment-17460493 ] Yuan Zhu commented on FLINK-25336: -- [~MartijnVisser] Thanks. Then what you provided is the Compatibility of Confluent Platform and Apache Kafka. Do we have the versions supported by Flink? I only find [https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/kafka/#dependency] . According to the word here, clients are backwards compatible with broker versions 0.10.0 or later. In fact, our compulsory config not support version 0.10.2. In that way, we just support higher version which I want to know. > Kafka connector compatible problem in Flink sql > --- > > Key: FLINK-25336 > URL: https://issues.apache.org/jira/browse/FLINK-25336 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.14.0 > Environment: Flink 1.14.0 > Kafka 0.10.2.1 >Reporter: Yuan Zhu >Priority: Minor > Labels: Flink-sql, Kafka, flink > Attachments: log.jpg > > > When I use sql to query kafka table, like > {code:java} > create table `kfk` > ( > user_id VARCHAR > ) with ( > 'connector' = 'kafka', > 'topic' = 'test', > 'properties.bootstrap.servers' = 'localhost:9092', > 'format' = 'json', > 'scan.startup.mode' = 'timestamp', > 'scan.startup.timestamp-millis' = '163941120', > 'properties.group.id' = 'test' > ); > CREATE TABLE print_table (user_id varchar) WITH ('connector' = 'print'); > insert into print_table select user_id from kfk;{code} > It will encounter an exception: > org.apache.kafka.common.errors.UnsupportedVersionException: MetadataRequest > versions older than 4 don't support the allowAutoTopicCreation field !log.jpg! -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-25336) Kafka connector compatible problem in Flink sql
[ https://issues.apache.org/jira/browse/FLINK-25336?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17460465#comment-17460465 ] Martijn Visser commented on FLINK-25336: [~straw] This version is not supported anymore per https://docs.confluent.io/platform/current/installation/versions-interoperability.html We always try to have as much backwards compatibility, but in order to support newer versions with specific features or bug fixes, there is a point where we can't support older versions anymore. > Kafka connector compatible problem in Flink sql > --- > > Key: FLINK-25336 > URL: https://issues.apache.org/jira/browse/FLINK-25336 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.14.0 > Environment: Flink 1.14.0 > Kafka 0.10.2.1 >Reporter: Yuan Zhu >Priority: Minor > Labels: Flink-sql, Kafka, flink > Attachments: log.jpg > > > When I use sql to query kafka table, like > {code:java} > create table `kfk` > ( > user_id VARCHAR > ) with ( > 'connector' = 'kafka', > 'topic' = 'test', > 'properties.bootstrap.servers' = 'localhost:9092', > 'format' = 'json', > 'scan.startup.mode' = 'timestamp', > 'scan.startup.timestamp-millis' = '163941120', > 'properties.group.id' = 'test' > ); > CREATE TABLE print_table (user_id varchar) WITH ('connector' = 'print'); > insert into print_table select user_id from kfk;{code} > It will encounter an exception: > org.apache.kafka.common.errors.UnsupportedVersionException: MetadataRequest > versions older than 4 don't support the allowAutoTopicCreation field !log.jpg! -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-25336) Kafka connector compatible problem in Flink sql
[ https://issues.apache.org/jira/browse/FLINK-25336?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17460431#comment-17460431 ] Yuan Zhu commented on FLINK-25336: -- [~JasonLee] Why not make it configurable to support more versions? Upgrading kafka version is troublesome. > Kafka connector compatible problem in Flink sql > --- > > Key: FLINK-25336 > URL: https://issues.apache.org/jira/browse/FLINK-25336 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.14.0 > Environment: Flink 1.14.0 > Kafka 0.10.2.1 >Reporter: Yuan Zhu >Priority: Minor > Labels: Flink-sql, Kafka, flink > Attachments: log.jpg > > > When I use sql to query kafka table, like > {code:java} > create table `kfk` > ( > user_id VARCHAR > ) with ( > 'connector' = 'kafka', > 'topic' = 'test', > 'properties.bootstrap.servers' = 'localhost:9092', > 'format' = 'json', > 'scan.startup.mode' = 'timestamp', > 'scan.startup.timestamp-millis' = '163941120', > 'properties.group.id' = 'test' > ); > CREATE TABLE print_table (user_id varchar) WITH ('connector' = 'print'); > insert into print_table select user_id from kfk;{code} > It will encounter an exception: > org.apache.kafka.common.errors.UnsupportedVersionException: MetadataRequest > versions older than 4 don't support the allowAutoTopicCreation field !log.jpg! -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-25336) Kafka connector compatible problem in Flink sql
[ https://issues.apache.org/jira/browse/FLINK-25336?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17460395#comment-17460395 ] JasonLee commented on FLINK-25336: -- [~straw] In fact, this is because the version of Kafka is too low. Just upgrade to a higher version. > Kafka connector compatible problem in Flink sql > --- > > Key: FLINK-25336 > URL: https://issues.apache.org/jira/browse/FLINK-25336 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.14.0 > Environment: Flink 1.14.0 > Kafka 0.10.2.1 >Reporter: Yuan Zhu >Priority: Minor > Labels: Flink-sql, Kafka, flink > Attachments: log.jpg > > > When I use sql to query kafka table, like > {code:java} > create table `kfk` > ( > user_id VARCHAR > ) with ( > 'connector' = 'kafka', > 'topic' = 'test', > 'properties.bootstrap.servers' = 'localhost:9092', > 'format' = 'json', > 'scan.startup.mode' = 'timestamp', > 'scan.startup.timestamp-millis' = '163941120', > 'properties.group.id' = 'test' > ); > CREATE TABLE print_table (user_id varchar) WITH ('connector' = 'print'); > insert into print_table select user_id from kfk;{code} > It will encounter an exception: > org.apache.kafka.common.errors.UnsupportedVersionException: MetadataRequest > versions older than 4 don't support the allowAutoTopicCreation field !log.jpg! -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-25336) Kafka connector compatible problem in Flink sql
[ https://issues.apache.org/jira/browse/FLINK-25336?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17460391#comment-17460391 ] Yuan Zhu commented on FLINK-25336: -- Hi, [~becket_qin]. What's the purpose here? Can we make ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG configurable? > Kafka connector compatible problem in Flink sql > --- > > Key: FLINK-25336 > URL: https://issues.apache.org/jira/browse/FLINK-25336 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.14.0 > Environment: Flink 1.14.0 > Kafka 0.10.2.1 >Reporter: Yuan Zhu >Priority: Minor > Labels: Flink-sql, Kafka, flink > Attachments: log.jpg > > > When I use sql to query kafka table, like > {code:java} > create table `kfk` > ( > user_id VARCHAR > ) with ( > 'connector' = 'kafka', > 'topic' = 'test', > 'properties.bootstrap.servers' = 'localhost:9092', > 'format' = 'json', > 'scan.startup.mode' = 'timestamp', > 'scan.startup.timestamp-millis' = '163941120', > 'properties.group.id' = 'test' > ); > CREATE TABLE print_table (user_id varchar) WITH ('connector' = 'print'); > insert into print_table select user_id from kfk;{code} > It will encounter an exception: > org.apache.kafka.common.errors.UnsupportedVersionException: MetadataRequest > versions older than 4 don't support the allowAutoTopicCreation field !log.jpg! -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-25336) Kafka connector compatible problem in Flink sql
[ https://issues.apache.org/jira/browse/FLINK-25336?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17460387#comment-17460387 ] Yuan Zhu commented on FLINK-25336: -- The cause of this exception is invoked by the logic in KafkaSourceEnumerator#getKafkaConsumer. In line 420, properties will be overwrite. {code:java} private KafkaConsumer getKafkaConsumer() { Properties consumerProps = new Properties(); deepCopyProperties(properties, consumerProps); …… consumerProps.setProperty(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, "false"); return new KafkaConsumer<>(consumerProps); } {code} It leads to the invalidation of config in sql ddl. > Kafka connector compatible problem in Flink sql > --- > > Key: FLINK-25336 > URL: https://issues.apache.org/jira/browse/FLINK-25336 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.14.0 > Environment: Flink 1.14.0 > Kafka 0.10.2.1 >Reporter: Yuan Zhu >Priority: Minor > Labels: Flink-sql, Kafka, flink > Attachments: log.jpg > > > When I use sql to query kafka table, like > {code:java} > create table `kfk` > ( > user_id VARCHAR > ) with ( > 'connector' = 'kafka', > 'topic' = 'test', > 'properties.bootstrap.servers' = 'localhost:9092', > 'format' = 'json', > 'scan.startup.mode' = 'timestamp', > 'scan.startup.timestamp-millis' = '163941120', > 'properties.group.id' = 'test' > ); > CREATE TABLE print_table (user_id varchar) WITH ('connector' = 'print'); > insert into print_table select user_id from kfk;{code} > It will encounter an exception: > org.apache.kafka.common.errors.UnsupportedVersionException: MetadataRequest > versions older than 4 don't support the allowAutoTopicCreation field !log.jpg! -- This message was sent by Atlassian Jira (v8.20.1#820001)