[jira] [Commented] (FLINK-10843) Make Kafka version definition more flexible for new Kafka table factory
[ https://issues.apache.org/jira/browse/FLINK-10843?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689587#comment-16689587 ] ASF GitHub Bot commented on FLINK-10843: asfgit closed pull request #7087: [FLINK-10843] [connectors] Make Kafka table factory versioning more flexible URL: https://github.com/apache/flink/pull/7087 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docs/dev/table/connect.md b/docs/dev/table/connect.md index 5f1112706ce..effd913707e 100644 --- a/docs/dev/table/connect.md +++ b/docs/dev/table/connect.md @@ -40,14 +40,15 @@ The following table list all available connectors and formats. Their mutual comp ### Connectors -| Name | Version | Maven dependency | SQL Client JAR | -| : | : | :--- | :--| -| Filesystem| | Built-in | Built-in | -| Elasticsearch | 6 | `flink-connector-elasticsearch6` | [Download](http://central.maven.org/maven2/org/apache/flink/flink-connector-elasticsearch6{{site.scala_version_suffix}}/{{site.version}}/flink-connector-elasticsearch6{{site.scala_version_suffix}}-{{site.version}}-sql-jar.jar) | -| Apache Kafka | 0.8 | `flink-connector-kafka-0.8` | Not available | -| Apache Kafka | 0.9 | `flink-connector-kafka-0.9` | [Download](http://central.maven.org/maven2/org/apache/flink/flink-connector-kafka-0.9{{site.scala_version_suffix}}/{{site.version}}/flink-connector-kafka-0.9{{site.scala_version_suffix}}-{{site.version}}-sql-jar.jar) | -| Apache Kafka | 0.10 | `flink-connector-kafka-0.10` | [Download](http://central.maven.org/maven2/org/apache/flink/flink-connector-kafka-0.10{{site.scala_version_suffix}}/{{site.version}}/flink-connector-kafka-0.10{{site.scala_version_suffix}}-{{site.version}}-sql-jar.jar) | -| Apache Kafka | 0.11 | `flink-connector-kafka-0.11` | [Download](http://central.maven.org/maven2/org/apache/flink/flink-connector-kafka-0.11{{site.scala_version_suffix}}/{{site.version}}/flink-connector-kafka-0.11{{site.scala_version_suffix}}-{{site.version}}-sql-jar.jar) | +| Name | Version | Maven dependency | SQL Client JAR | +| : | :-- | :--- | :--| +| Filesystem| | Built-in | Built-in | +| Elasticsearch | 6 | `flink-connector-elasticsearch6` | [Download](http://central.maven.org/maven2/org/apache/flink/flink-connector-elasticsearch6{{site.scala_version_suffix}}/{{site.version}}/flink-connector-elasticsearch6{{site.scala_version_suffix}}-{{site.version}}-sql-jar.jar) | +| Apache Kafka | 0.8 | `flink-connector-kafka-0.8` | Not available | +| Apache Kafka | 0.9 | `flink-connector-kafka-0.9` | [Download](http://central.maven.org/maven2/org/apache/flink/flink-connector-kafka-0.9{{site.scala_version_suffix}}/{{site.version}}/flink-connector-kafka-0.9{{site.scala_version_suffix}}-{{site.version}}-sql-jar.jar) | +| Apache Kafka | 0.10| `flink-connector-kafka-0.10` | [Download](http://central.maven.org/maven2/org/apache/flink/flink-connector-kafka-0.10{{site.scala_version_suffix}}/{{site.version}}/flink-connector-kafka-0.10{{site.scala_version_suffix}}-{{site.version}}-sql-jar.jar) | +| Apache Kafka | 0.11| `flink-connector-kafka-0.11` | [Download](http://central.maven.org/maven2/org/apache/flink/flink-connector-kafka-0.11{{site.scala_version_suffix}}/{{site.version}}/flink-connector-kafka-0.11{{site.scala_version_suffix}}-{{site.version}}-sql-jar.jar) | +| Apache Kafka | 0.11+ (`universal`) | `flink-connector-kafka` | [Download](http://central.maven.org/maven2/org/apache/flink/flink-connector-kafka{{site.scala_version_suffix}}/{{site.version}}/flink-connector-kafka{{site.scala_version_suffix}}-{{site.version}}-sql-jar.jar) | ### Formats @@ -524,7 +525,8 @@ The Kafka connector allows for reading and writing from and to an Apache Kafka t {% highlight java %} .connect( new Kafka() -.version("0.11")// required: valid connector versions are "0.8", "0.9", "0.10", and "0.11" +.version("0.11")// required: valid connector versions are +// "0.8", "0.9", "0.10", "0.11", and "universal" .topic("...") // required: topic name from which the table is read // optional: connector specific properties @@ -549,7 +551,8 @@
[jira] [Commented] (FLINK-10843) Make Kafka version definition more flexible for new Kafka table factory
[ https://issues.apache.org/jira/browse/FLINK-10843?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689586#comment-16689586 ] ASF GitHub Bot commented on FLINK-10843: twalthr commented on issue #7087: [FLINK-10843] [connectors] Make Kafka table factory versioning more flexible URL: https://github.com/apache/flink/pull/7087#issuecomment-439437076 Thanks @pnowojski. Will merge this. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Make Kafka version definition more flexible for new Kafka table factory > --- > > Key: FLINK-10843 > URL: https://issues.apache.org/jira/browse/FLINK-10843 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, Table API SQL >Affects Versions: 1.7.0 >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > Currently, a user has to specify a specific version for a Kafka connector > like: > {code} > connector: > type: kafka > version: "0.11" # required: valid connector versions are "0.8", "0.9", > "0.10", and "0.11" > topic: ... # required: topic name from which the table is read > {code} > However, the new Kafka connector aims to be universal, thus, at least for 1.x > and 2.x versions which we should support those as parameters as well. > Currently, {{2.0}} is the only accepted string for the factory. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10843) Make Kafka version definition more flexible for new Kafka table factory
[ https://issues.apache.org/jira/browse/FLINK-10843?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689534#comment-16689534 ] ASF GitHub Bot commented on FLINK-10843: pnowojski commented on a change in pull request #7087: [FLINK-10843] [connectors] Make Kafka table factory versioning more flexible URL: https://github.com/apache/flink/pull/7087#discussion_r234237406 ## File path: flink-end-to-end-tests/test-scripts/test_sql_client_kafka010.sh ## @@ -19,4 +19,4 @@ set -Eeuo pipefail -source "$(dirname "$0")"/test_sql_client_kafka_common.sh 0.10 0.10.2.0 3.2.0 3.2 "kafka-0.10" +source "$(dirname "$0")"/test_sql_client_kafka_common.sh 0.10 0.10.2.0 3.2.0 3.2 "kafka-0.10" "0.10" Review comment: This starts getting ridiculous: - `0.10` - `0.10.2.0` - `"0.10"` - `"kafka-0.10"` or - `3.2` - `3.2.0` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Make Kafka version definition more flexible for new Kafka table factory > --- > > Key: FLINK-10843 > URL: https://issues.apache.org/jira/browse/FLINK-10843 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, Table API SQL >Affects Versions: 1.7.0 >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > Currently, a user has to specify a specific version for a Kafka connector > like: > {code} > connector: > type: kafka > version: "0.11" # required: valid connector versions are "0.8", "0.9", > "0.10", and "0.11" > topic: ... # required: topic name from which the table is read > {code} > However, the new Kafka connector aims to be universal, thus, at least for 1.x > and 2.x versions which we should support those as parameters as well. > Currently, {{2.0}} is the only accepted string for the factory. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10843) Make Kafka version definition more flexible for new Kafka table factory
[ https://issues.apache.org/jira/browse/FLINK-10843?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689159#comment-16689159 ] ASF GitHub Bot commented on FLINK-10843: twalthr commented on issue #7087: [FLINK-10843] [connectors] Make Kafka table factory versioning more flexible URL: https://github.com/apache/flink/pull/7087#issuecomment-439317653 @pnowojski I updated the PR. Now we just matched against the version `universal`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Make Kafka version definition more flexible for new Kafka table factory > --- > > Key: FLINK-10843 > URL: https://issues.apache.org/jira/browse/FLINK-10843 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, Table API SQL >Affects Versions: 1.7.0 >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > Currently, a user has to specify a specific version for a Kafka connector > like: > {code} > connector: > type: kafka > version: "0.11" # required: valid connector versions are "0.8", "0.9", > "0.10", and "0.11" > topic: ... # required: topic name from which the table is read > {code} > However, the new Kafka connector aims to be universal, thus, at least for 1.x > and 2.x versions which we should support those as parameters as well. > Currently, {{2.0}} is the only accepted string for the factory. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10843) Make Kafka version definition more flexible for new Kafka table factory
[ https://issues.apache.org/jira/browse/FLINK-10843?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16686205#comment-16686205 ] ASF GitHub Bot commented on FLINK-10843: twalthr commented on a change in pull request #7087: [FLINK-10843] [connectors] Make Kafka table factory versioning more flexible URL: https://github.com/apache/flink/pull/7087#discussion_r233353227 ## File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java ## @@ -40,7 +41,9 @@ public static final String CONNECTOR_VERSION_VALUE_09 = "0.9"; public static final String CONNECTOR_VERSION_VALUE_010 = "0.10"; public static final String CONNECTOR_VERSION_VALUE_011 = "0.11"; - public static final String CONNECTOR_VERSION_VALUE_20 = "2.0"; + public static final String CONNECTOR_VERSION_VALUE_UNIVERSAL = "universal"; Review comment: This is the one and only place that should be touched when modifying properties. The goal is to have all properties and potential values in one central place. In a version-agnostic manner. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Make Kafka version definition more flexible for new Kafka table factory > --- > > Key: FLINK-10843 > URL: https://issues.apache.org/jira/browse/FLINK-10843 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, Table API SQL >Affects Versions: 1.7.0 >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > Currently, a user has to specify a specific version for a Kafka connector > like: > {code} > connector: > type: kafka > version: "0.11" # required: valid connector versions are "0.8", "0.9", > "0.10", and "0.11" > topic: ... # required: topic name from which the table is read > {code} > However, the new Kafka connector aims to be universal, thus, at least for 1.x > and 2.x versions which we should support those as parameters as well. > Currently, {{2.0}} is the only accepted string for the factory. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10843) Make Kafka version definition more flexible for new Kafka table factory
[ https://issues.apache.org/jira/browse/FLINK-10843?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16686201#comment-16686201 ] ASF GitHub Bot commented on FLINK-10843: twalthr commented on a change in pull request #7087: [FLINK-10843] [connectors] Make Kafka table factory versioning more flexible URL: https://github.com/apache/flink/pull/7087#discussion_r233351993 ## File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java ## @@ -187,9 +190,21 @@ // /** -* Returns the Kafka version. +* Returns a hard-coded Kafka version. +* +* @deprecated This method can be dropped once we only maintain the universal Kafka connector. */ - protected abstract String kafkaVersion(); + @Deprecated + protected abstract Optional kafkaVersion(); + + /** +* Returns the pattern that a validator should accept as Kafka version. +*/ + protected Pattern[] kafkaVersionPattern() { Review comment: I revert my comment from above. The explicit version is still necessary since Kafka module `10` depends on module `9`, thus both factories are present in a Kafka 0.10 module. The two factories would interfere with each other if a hard-coded version is not part of the context. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Make Kafka version definition more flexible for new Kafka table factory > --- > > Key: FLINK-10843 > URL: https://issues.apache.org/jira/browse/FLINK-10843 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, Table API SQL >Affects Versions: 1.7.0 >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > Currently, a user has to specify a specific version for a Kafka connector > like: > {code} > connector: > type: kafka > version: "0.11" # required: valid connector versions are "0.8", "0.9", > "0.10", and "0.11" > topic: ... # required: topic name from which the table is read > {code} > However, the new Kafka connector aims to be universal, thus, at least for 1.x > and 2.x versions which we should support those as parameters as well. > Currently, {{2.0}} is the only accepted string for the factory. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10843) Make Kafka version definition more flexible for new Kafka table factory
[ https://issues.apache.org/jira/browse/FLINK-10843?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16686200#comment-16686200 ] ASF GitHub Bot commented on FLINK-10843: twalthr commented on a change in pull request #7087: [FLINK-10843] [connectors] Make Kafka table factory versioning more flexible URL: https://github.com/apache/flink/pull/7087#discussion_r233351993 ## File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java ## @@ -187,9 +190,21 @@ // /** -* Returns the Kafka version. +* Returns a hard-coded Kafka version. +* +* @deprecated This method can be dropped once we only maintain the universal Kafka connector. */ - protected abstract String kafkaVersion(); + @Deprecated + protected abstract Optional kafkaVersion(); + + /** +* Returns the pattern that a validator should accept as Kafka version. +*/ + protected Pattern[] kafkaVersionPattern() { Review comment: I revert my comment from above. The explicit version is still necessary since version `10` depends on version `9`, thus both factories are present in a Kafka 0.10 module. The two factories would interfere with each other if a hard-coded version is not part of the context. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Make Kafka version definition more flexible for new Kafka table factory > --- > > Key: FLINK-10843 > URL: https://issues.apache.org/jira/browse/FLINK-10843 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, Table API SQL >Affects Versions: 1.7.0 >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > Currently, a user has to specify a specific version for a Kafka connector > like: > {code} > connector: > type: kafka > version: "0.11" # required: valid connector versions are "0.8", "0.9", > "0.10", and "0.11" > topic: ... # required: topic name from which the table is read > {code} > However, the new Kafka connector aims to be universal, thus, at least for 1.x > and 2.x versions which we should support those as parameters as well. > Currently, {{2.0}} is the only accepted string for the factory. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10843) Make Kafka version definition more flexible for new Kafka table factory
[ https://issues.apache.org/jira/browse/FLINK-10843?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16686199#comment-16686199 ] ASF GitHub Bot commented on FLINK-10843: twalthr commented on a change in pull request #7087: [FLINK-10843] [connectors] Make Kafka table factory versioning more flexible URL: https://github.com/apache/flink/pull/7087#discussion_r233351993 ## File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java ## @@ -187,9 +190,21 @@ // /** -* Returns the Kafka version. +* Returns a hard-coded Kafka version. +* +* @deprecated This method can be dropped once we only maintain the universal Kafka connector. */ - protected abstract String kafkaVersion(); + @Deprecated + protected abstract Optional kafkaVersion(); + + /** +* Returns the pattern that a validator should accept as Kafka version. +*/ + protected Pattern[] kafkaVersionPattern() { Review comment: I revert my comment from a above. The explicit version is still necessary since version `10` depends on version `9`, thus both factories are present in a Kafka 0.10 module. The two factories would interfere with each other if a hard-coded version is not part of the context. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Make Kafka version definition more flexible for new Kafka table factory > --- > > Key: FLINK-10843 > URL: https://issues.apache.org/jira/browse/FLINK-10843 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, Table API SQL >Affects Versions: 1.7.0 >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > Currently, a user has to specify a specific version for a Kafka connector > like: > {code} > connector: > type: kafka > version: "0.11" # required: valid connector versions are "0.8", "0.9", > "0.10", and "0.11" > topic: ... # required: topic name from which the table is read > {code} > However, the new Kafka connector aims to be universal, thus, at least for 1.x > and 2.x versions which we should support those as parameters as well. > Currently, {{2.0}} is the only accepted string for the factory. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10843) Make Kafka version definition more flexible for new Kafka table factory
[ https://issues.apache.org/jira/browse/FLINK-10843?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16686197#comment-16686197 ] ASF GitHub Bot commented on FLINK-10843: twalthr commented on a change in pull request #7087: [FLINK-10843] [connectors] Make Kafka table factory versioning more flexible URL: https://github.com/apache/flink/pull/7087#discussion_r233350930 ## File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java ## @@ -187,9 +190,21 @@ // /** -* Returns the Kafka version. +* Returns a hard-coded Kafka version. +* +* @deprecated This method can be dropped once we only maintain the universal Kafka connector. */ - protected abstract String kafkaVersion(); + @Deprecated + protected abstract Optional kafkaVersion(); + + /** +* Returns the pattern that a validator should accept as Kafka version. +*/ + protected Pattern[] kafkaVersionPattern() { Review comment: The reason for two separate methods was that the previous factories explicitly matched only if the version was correct. But I agree that the logic might have been unnecessarily complicated. I removed the `kafkaVersion()` method. I think `Pattern` is still fine because is not a dependency and also not exposed as factories are a pure Flink internal thing. Once we drop the old Kafka connectors we don't need to "expose" the pattern anymore. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Make Kafka version definition more flexible for new Kafka table factory > --- > > Key: FLINK-10843 > URL: https://issues.apache.org/jira/browse/FLINK-10843 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, Table API SQL >Affects Versions: 1.7.0 >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > Currently, a user has to specify a specific version for a Kafka connector > like: > {code} > connector: > type: kafka > version: "0.11" # required: valid connector versions are "0.8", "0.9", > "0.10", and "0.11" > topic: ... # required: topic name from which the table is read > {code} > However, the new Kafka connector aims to be universal, thus, at least for 1.x > and 2.x versions which we should support those as parameters as well. > Currently, {{2.0}} is the only accepted string for the factory. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10843) Make Kafka version definition more flexible for new Kafka table factory
[ https://issues.apache.org/jira/browse/FLINK-10843?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16686187#comment-16686187 ] ASF GitHub Bot commented on FLINK-10843: twalthr commented on a change in pull request #7087: [FLINK-10843] [connectors] Make Kafka table factory versioning more flexible URL: https://github.com/apache/flink/pull/7087#discussion_r233347043 ## File path: docs/dev/table/connect.md ## @@ -524,7 +525,8 @@ The Kafka connector allows for reading and writing from and to an Apache Kafka t {% highlight java %} .connect( new Kafka() -.version("0.11")// required: valid connector versions are "0.8", "0.9", "0.10", and "0.11" +.version("0.11")// required: connector major version +// such as "0.8", "0.9", "0.10", "0.11", or "2.0" Review comment: The "such as" indicates that these are just examples. The list of supported major version can be found in the dependency section. I added 1.0 as an additional example. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Make Kafka version definition more flexible for new Kafka table factory > --- > > Key: FLINK-10843 > URL: https://issues.apache.org/jira/browse/FLINK-10843 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, Table API SQL >Affects Versions: 1.7.0 >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > Currently, a user has to specify a specific version for a Kafka connector > like: > {code} > connector: > type: kafka > version: "0.11" # required: valid connector versions are "0.8", "0.9", > "0.10", and "0.11" > topic: ... # required: topic name from which the table is read > {code} > However, the new Kafka connector aims to be universal, thus, at least for 1.x > and 2.x versions which we should support those as parameters as well. > Currently, {{2.0}} is the only accepted string for the factory. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10843) Make Kafka version definition more flexible for new Kafka table factory
[ https://issues.apache.org/jira/browse/FLINK-10843?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16685468#comment-16685468 ] ASF GitHub Bot commented on FLINK-10843: twalthr commented on a change in pull request #7087: [FLINK-10843] [connectors] Make Kafka table factory versioning more flexible URL: https://github.com/apache/flink/pull/7087#discussion_r233129481 ## File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java ## @@ -104,6 +106,7 @@ List properties = new ArrayList<>(); // kafka + properties.add(CONNECTOR_VERSION); Review comment: This line is relevant. If it is not defined in the context, it must be added to the supported properties. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Make Kafka version definition more flexible for new Kafka table factory > --- > > Key: FLINK-10843 > URL: https://issues.apache.org/jira/browse/FLINK-10843 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, Table API SQL >Affects Versions: 1.7.0 >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > Currently, a user has to specify a specific version for a Kafka connector > like: > {code} > connector: > type: kafka > version: "0.11" # required: valid connector versions are "0.8", "0.9", > "0.10", and "0.11" > topic: ... # required: topic name from which the table is read > {code} > However, the new Kafka connector aims to be universal, thus, at least for 1.x > and 2.x versions which we should support those as parameters as well. > Currently, {{2.0}} is the only accepted string for the factory. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10843) Make Kafka version definition more flexible for new Kafka table factory
[ https://issues.apache.org/jira/browse/FLINK-10843?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16685413#comment-16685413 ] ASF GitHub Bot commented on FLINK-10843: pnowojski commented on a change in pull request #7087: [FLINK-10843] [connectors] Make Kafka table factory versioning more flexible URL: https://github.com/apache/flink/pull/7087#discussion_r233097356 ## File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java ## @@ -249,7 +264,7 @@ private DescriptorProperties getValidatedProperties(Map properti // allow Kafka timestamps to be used, watermarks can not be received from source new SchemaValidator(true, supportsKafkaTimestamps(), false).validate(descriptorProperties); - new KafkaValidator().validate(descriptorProperties); + new KafkaValidator(kafkaVersionPattern()).validate(descriptorProperties); Review comment: It's kind of strange that we validate the kafka version after the fact that we have already selected some kafka version (after all `this` instance has already a known Kafka version). This seems like either a duplicated logic or a kind of unit test that is perform during the runtime. Maybe it could be avoided/removed? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Make Kafka version definition more flexible for new Kafka table factory > --- > > Key: FLINK-10843 > URL: https://issues.apache.org/jira/browse/FLINK-10843 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, Table API SQL >Affects Versions: 1.7.0 >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > Currently, a user has to specify a specific version for a Kafka connector > like: > {code} > connector: > type: kafka > version: "0.11" # required: valid connector versions are "0.8", "0.9", > "0.10", and "0.11" > topic: ... # required: topic name from which the table is read > {code} > However, the new Kafka connector aims to be universal, thus, at least for 1.x > and 2.x versions which we should support those as parameters as well. > Currently, {{2.0}} is the only accepted string for the factory. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10843) Make Kafka version definition more flexible for new Kafka table factory
[ https://issues.apache.org/jira/browse/FLINK-10843?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16685414#comment-16685414 ] ASF GitHub Bot commented on FLINK-10843: pnowojski commented on a change in pull request #7087: [FLINK-10843] [connectors] Make Kafka table factory versioning more flexible URL: https://github.com/apache/flink/pull/7087#discussion_r233103572 ## File path: docs/dev/table/connect.md ## @@ -524,7 +525,8 @@ The Kafka connector allows for reading and writing from and to an Apache Kafka t {% highlight java %} .connect( new Kafka() -.version("0.11")// required: valid connector versions are "0.8", "0.9", "0.10", and "0.11" +.version("0.11")// required: connector major version +// such as "0.8", "0.9", "0.10", "0.11", or "2.0" Review comment: this kind of suggests that `1.0` or `1.1` is not supported. Maybe we should change it to ``` // such as "0.8", "0.9", "0.10", "0.11" ... ``` or add a separate table with list of supported major versions, including a caveat that even future versions should be supported by "universal" connector? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Make Kafka version definition more flexible for new Kafka table factory > --- > > Key: FLINK-10843 > URL: https://issues.apache.org/jira/browse/FLINK-10843 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, Table API SQL >Affects Versions: 1.7.0 >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > Currently, a user has to specify a specific version for a Kafka connector > like: > {code} > connector: > type: kafka > version: "0.11" # required: valid connector versions are "0.8", "0.9", > "0.10", and "0.11" > topic: ... # required: topic name from which the table is read > {code} > However, the new Kafka connector aims to be universal, thus, at least for 1.x > and 2.x versions which we should support those as parameters as well. > Currently, {{2.0}} is the only accepted string for the factory. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10843) Make Kafka version definition more flexible for new Kafka table factory
[ https://issues.apache.org/jira/browse/FLINK-10843?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16685415#comment-16685415 ] ASF GitHub Bot commented on FLINK-10843: pnowojski commented on a change in pull request #7087: [FLINK-10843] [connectors] Make Kafka table factory versioning more flexible URL: https://github.com/apache/flink/pull/7087#discussion_r233090756 ## File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java ## @@ -187,9 +190,21 @@ // /** -* Returns the Kafka version. +* Returns a hard-coded Kafka version. +* +* @deprecated This method can be dropped once we only maintain the universal Kafka connector. */ - protected abstract String kafkaVersion(); + @Deprecated + protected abstract Optional kafkaVersion(); + + /** +* Returns the pattern that a validator should accept as Kafka version. +*/ + protected Pattern[] kafkaVersionPattern() { Review comment: Do we need two separate methods for that? Why can not we use just the more general one `kafkaVersionPattern`? Or maybe even better: ``` protected abstract boolean isCompatible(String kafkaVersionString); ``` which can be implemented either as `kafkaVersionString.equals("0.11")` or by patterns, without exposing the `Pattern` dependency and also being even more flexible? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Make Kafka version definition more flexible for new Kafka table factory > --- > > Key: FLINK-10843 > URL: https://issues.apache.org/jira/browse/FLINK-10843 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, Table API SQL >Affects Versions: 1.7.0 >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > Currently, a user has to specify a specific version for a Kafka connector > like: > {code} > connector: > type: kafka > version: "0.11" # required: valid connector versions are "0.8", "0.9", > "0.10", and "0.11" > topic: ... # required: topic name from which the table is read > {code} > However, the new Kafka connector aims to be universal, thus, at least for 1.x > and 2.x versions which we should support those as parameters as well. > Currently, {{2.0}} is the only accepted string for the factory. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10843) Make Kafka version definition more flexible for new Kafka table factory
[ https://issues.apache.org/jira/browse/FLINK-10843?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16685416#comment-16685416 ] ASF GitHub Bot commented on FLINK-10843: pnowojski commented on a change in pull request #7087: [FLINK-10843] [connectors] Make Kafka table factory versioning more flexible URL: https://github.com/apache/flink/pull/7087#discussion_r233098151 ## File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java ## @@ -40,7 +41,9 @@ public static final String CONNECTOR_VERSION_VALUE_09 = "0.9"; public static final String CONNECTOR_VERSION_VALUE_010 = "0.10"; public static final String CONNECTOR_VERSION_VALUE_011 = "0.11"; - public static final String CONNECTOR_VERSION_VALUE_20 = "2.0"; + public static final String CONNECTOR_VERSION_VALUE_UNIVERSAL = "universal"; Review comment: Shouldn't those constant be moved to specific kafka connectors? They are not used anywhere else (as it is now, this is one extra place which needs to be modified when adding new Kafka connector). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Make Kafka version definition more flexible for new Kafka table factory > --- > > Key: FLINK-10843 > URL: https://issues.apache.org/jira/browse/FLINK-10843 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, Table API SQL >Affects Versions: 1.7.0 >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > Currently, a user has to specify a specific version for a Kafka connector > like: > {code} > connector: > type: kafka > version: "0.11" # required: valid connector versions are "0.8", "0.9", > "0.10", and "0.11" > topic: ... # required: topic name from which the table is read > {code} > However, the new Kafka connector aims to be universal, thus, at least for 1.x > and 2.x versions which we should support those as parameters as well. > Currently, {{2.0}} is the only accepted string for the factory. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10843) Make Kafka version definition more flexible for new Kafka table factory
[ https://issues.apache.org/jira/browse/FLINK-10843?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16685417#comment-16685417 ] ASF GitHub Bot commented on FLINK-10843: pnowojski commented on a change in pull request #7087: [FLINK-10843] [connectors] Make Kafka table factory versioning more flexible URL: https://github.com/apache/flink/pull/7087#discussion_r233101708 ## File path: docs/dev/table/connect.md ## @@ -583,7 +586,9 @@ connector: **Consistency guarantees:** By default, a Kafka sink ingests data with at-least-once guarantees into a Kafka topic if the query is executed with [checkpointing enabled]({{ site.baseurl }}/dev/stream/state/checkpointing.html#enabling-and-configuring-checkpointing). -**Kafka 0.10+ Timestamps:** Since Kafka 0.10, Kafka messages have a timestamp as metadata that specifies when the record was written into the Kafka topic. These timestamps can be used for a [rowtime attribute](connect.html#defining-the-schema) by selecting `timestamps: from-source` in YAML and `timestampsFromSource()` in Java/Scala respectively. +**Kafka 0.10+ Timestamps:** Since Kafka 0.10, Kafka messages have a timestamp as metadata that specifies when the record was written into the Kafka topic. These timestamps can be used for a [rowtime attribute](connect.html#defining-the-schema) by selecting `timestamps: from-source` in YAML and `timestampsFromSource()` in Java/Scala respectively. + +**Kafka 0.11+ Versioning:** If the Kafka connector definition should be independent of a hard-coded Kafka version, use the connector version `universal` as a wildcard for Flink's modern Kafka connector. Review comment: Do not use `modern` - at some point of time it will not be modern anymore. `universal`? `for Flink's Kafka connector that is compatible with all Kafka versions starting from 0.11`.? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Make Kafka version definition more flexible for new Kafka table factory > --- > > Key: FLINK-10843 > URL: https://issues.apache.org/jira/browse/FLINK-10843 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, Table API SQL >Affects Versions: 1.7.0 >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > Currently, a user has to specify a specific version for a Kafka connector > like: > {code} > connector: > type: kafka > version: "0.11" # required: valid connector versions are "0.8", "0.9", > "0.10", and "0.11" > topic: ... # required: topic name from which the table is read > {code} > However, the new Kafka connector aims to be universal, thus, at least for 1.x > and 2.x versions which we should support those as parameters as well. > Currently, {{2.0}} is the only accepted string for the factory. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10843) Make Kafka version definition more flexible for new Kafka table factory
[ https://issues.apache.org/jira/browse/FLINK-10843?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16685412#comment-16685412 ] ASF GitHub Bot commented on FLINK-10843: pnowojski commented on a change in pull request #7087: [FLINK-10843] [connectors] Make Kafka table factory versioning more flexible URL: https://github.com/apache/flink/pull/7087#discussion_r233089061 ## File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java ## @@ -104,6 +106,7 @@ List properties = new ArrayList<>(); // kafka + properties.add(CONNECTOR_VERSION); Review comment: Is this line relevant or is it a separate bug fix? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Make Kafka version definition more flexible for new Kafka table factory > --- > > Key: FLINK-10843 > URL: https://issues.apache.org/jira/browse/FLINK-10843 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, Table API SQL >Affects Versions: 1.7.0 >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > Currently, a user has to specify a specific version for a Kafka connector > like: > {code} > connector: > type: kafka > version: "0.11" # required: valid connector versions are "0.8", "0.9", > "0.10", and "0.11" > topic: ... # required: topic name from which the table is read > {code} > However, the new Kafka connector aims to be universal, thus, at least for 1.x > and 2.x versions which we should support those as parameters as well. > Currently, {{2.0}} is the only accepted string for the factory. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10843) Make Kafka version definition more flexible for new Kafka table factory
[ https://issues.apache.org/jira/browse/FLINK-10843?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16685165#comment-16685165 ] ASF GitHub Bot commented on FLINK-10843: twalthr opened a new pull request #7087: [FLINK-10843] [connectors] Make Kafka table factory versioning more flexible URL: https://github.com/apache/flink/pull/7087 ## What is the purpose of the change This PR makes the versioning of a Kafka table factory more flexible. It still forces a Kafka version for future backwards compatibility cases. It allows for `1.*` and `2.*` versioning for the modern Kafka connector. However, a user can always fallback to a `universal` version wildcard. ## Brief change log - Extended Kafka validator - Extended Kafka table factory ## Verifying this change Existing tests are still correct. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? docs This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Make Kafka version definition more flexible for new Kafka table factory > --- > > Key: FLINK-10843 > URL: https://issues.apache.org/jira/browse/FLINK-10843 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, Table API SQL >Affects Versions: 1.7.0 >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > Currently, a user has to specify a specific version for a Kafka connector > like: > {code} > connector: > type: kafka > version: "0.11" # required: valid connector versions are "0.8", "0.9", > "0.10", and "0.11" > topic: ... # required: topic name from which the table is read > {code} > However, the new Kafka connector aims to be universal, thus, at least for 1.x > and 2.x versions which we should support those as parameters as well. > Currently, {{2.0}} is the only accepted string for the factory. -- This message was sent by Atlassian JIRA (v7.6.3#76005)