[jira] [Commented] (FLINK-8118) Allow to specify the offsets of KafkaTableSources
[ https://issues.apache.org/jira/browse/FLINK-8118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264505#comment-16264505 ] Timo Walther commented on FLINK-8118: - Fixed in 1.4: 2fb24581a1775084e3be8c2575c129d250f39313 & 13631b9617d32e46eba51c9125019ec5e77c39f3 > Allow to specify the offsets of KafkaTableSources > - > > Key: FLINK-8118 > URL: https://issues.apache.org/jira/browse/FLINK-8118 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Timo Walther >Assignee: Xingcan Cui > Fix For: 1.4.0, 1.5.0 > > > Right now the Kafka TableSources can only read from the current group offset. > We should expose the possibilities of the Kafka Consumer: > https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-consumers-start-position-configuration -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8118) Allow to specify the offsets of KafkaTableSources
[ https://issues.apache.org/jira/browse/FLINK-8118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264354#comment-16264354 ] ASF GitHub Bot commented on FLINK-8118: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5056 > Allow to specify the offsets of KafkaTableSources > - > > Key: FLINK-8118 > URL: https://issues.apache.org/jira/browse/FLINK-8118 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Timo Walther >Assignee: Xingcan Cui > Fix For: 1.4.0 > > > Right now the Kafka TableSources can only read from the current group offset. > We should expose the possibilities of the Kafka Consumer: > https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-consumers-start-position-configuration -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8118) Allow to specify the offsets of KafkaTableSources
[ https://issues.apache.org/jira/browse/FLINK-8118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264310#comment-16264310 ] ASF GitHub Bot commented on FLINK-8118: --- Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/5056#discussion_r152797724 --- Diff: docs/dev/table/sourceSinks.md --- @@ -264,6 +288,30 @@ val source: TableSource[_] = Kafka010AvroTableSource.builder() +* **Specify the start reading position:** By default, the table source will start reading data from the committed group offsets in Zookeeper or Kafka brokers. You can specify other start positions via the builder's methods, which correspond to the configurations in section [Kafka Consumers Start Position Configuration](../connectors/kafka.html#kafka-consumers-start-position-configuration). + + + +{% highlight java %} +TableSource source = Kafka010JsonTableSource.builder() --- End diff -- Oops, sorry about that and thanks for pointing it out. > Allow to specify the offsets of KafkaTableSources > - > > Key: FLINK-8118 > URL: https://issues.apache.org/jira/browse/FLINK-8118 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Timo Walther >Assignee: Xingcan Cui > > Right now the Kafka TableSources can only read from the current group offset. > We should expose the possibilities of the Kafka Consumer: > https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-consumers-start-position-configuration -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8118) Allow to specify the offsets of KafkaTableSources
[ https://issues.apache.org/jira/browse/FLINK-8118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264305#comment-16264305 ] ASF GitHub Bot commented on FLINK-8118: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5056#discussion_r152795641 --- Diff: docs/dev/table/sourceSinks.md --- @@ -264,6 +288,30 @@ val source: TableSource[_] = Kafka010AvroTableSource.builder() +* **Specify the start reading position:** By default, the table source will start reading data from the committed group offsets in Zookeeper or Kafka brokers. You can specify other start positions via the builder's methods, which correspond to the configurations in section [Kafka Consumers Start Position Configuration](../connectors/kafka.html#kafka-consumers-start-position-configuration). + + + +{% highlight java %} +TableSource source = Kafka010JsonTableSource.builder() --- End diff -- This should be Kafka010AvroTableSource. > Allow to specify the offsets of KafkaTableSources > - > > Key: FLINK-8118 > URL: https://issues.apache.org/jira/browse/FLINK-8118 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Timo Walther >Assignee: Xingcan Cui > > Right now the Kafka TableSources can only read from the current group offset. > We should expose the possibilities of the Kafka Consumer: > https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-consumers-start-position-configuration -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8118) Allow to specify the offsets of KafkaTableSources
[ https://issues.apache.org/jira/browse/FLINK-8118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264257#comment-16264257 ] ASF GitHub Bot commented on FLINK-8118: --- Github user xccui commented on the issue: https://github.com/apache/flink/pull/5056 Hi @fhueske, thanks for the review! The PR has been updated according to your comments. Thanks, Xingcan > Allow to specify the offsets of KafkaTableSources > - > > Key: FLINK-8118 > URL: https://issues.apache.org/jira/browse/FLINK-8118 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Timo Walther >Assignee: Xingcan Cui > > Right now the Kafka TableSources can only read from the current group offset. > We should expose the possibilities of the Kafka Consumer: > https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-consumers-start-position-configuration -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8118) Allow to specify the offsets of KafkaTableSources
[ https://issues.apache.org/jira/browse/FLINK-8118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264139#comment-16264139 ] ASF GitHub Bot commented on FLINK-8118: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5056#discussion_r152763735 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java --- @@ -309,6 +356,51 @@ public B withKafkaTimestampAsRowtimeAttribute( return builder(); } + /** +* Configures the TableSource to start reading from the earliest offset for all partitions. +* +* @see FlinkKafkaConsumerBase#setStartFromEarliest() +*/ + public B startReadingFromEarliest() { + this.startupMode = StartupMode.EARLIEST; + this.specificStartupOffsets = null; + return builder(); + } + + /** +* Configures the TableSource to start reading from the latest offset for all partitions. +* +* @see FlinkKafkaConsumerBase#setStartFromLatest() +*/ + public B startReadingFromLatest() { + this.startupMode = StartupMode.LATEST; + this.specificStartupOffsets = null; + return builder(); + } + + /** +* Configures the TableSource to start reading from any committed group offsets found in Zookeeper / Kafka brokers. +* +* @see FlinkKafkaConsumerBase#setStartFromGroupOffsets() +*/ + public B startReadingFromGroupOffsets() { + this.startupMode = StartupMode.GROUP_OFFSETS; + this.specificStartupOffsets = null; + return builder(); + } + + /** +* Configures the TableSource to start reading partitions from specific offsets, set independently for each partition. +* +* @param specificStartupOffsets the specified offsets for partitions +* @see FlinkKafkaConsumerBase#setStartFromSpecificOffsets(Map) +*/ + public B startReadingFromSpecificOffsets(MapspecificStartupOffsets) { --- End diff -- shorten to `fromSpecificOffsets`? > Allow to specify the offsets of KafkaTableSources > - > > Key: FLINK-8118 > URL: https://issues.apache.org/jira/browse/FLINK-8118 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Timo Walther >Assignee: Xingcan Cui > > Right now the Kafka TableSources can only read from the current group offset. > We should expose the possibilities of the Kafka Consumer: > https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-consumers-start-position-configuration -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8118) Allow to specify the offsets of KafkaTableSources
[ https://issues.apache.org/jira/browse/FLINK-8118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264142#comment-16264142 ] ASF GitHub Bot commented on FLINK-8118: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5056#discussion_r152761700 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java --- @@ -163,14 +203,14 @@ protected void setRowtimeAttributeDescriptors(List r ABSTRACT METHODS FOR SUBCLASSES /** -* Returns the version-specific Kafka consumer. +* Create a version-specific Kafka consumer. --- End diff -- Create -> Creates > Allow to specify the offsets of KafkaTableSources > - > > Key: FLINK-8118 > URL: https://issues.apache.org/jira/browse/FLINK-8118 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Timo Walther >Assignee: Xingcan Cui > > Right now the Kafka TableSources can only read from the current group offset. > We should expose the possibilities of the Kafka Consumer: > https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-consumers-start-position-configuration -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8118) Allow to specify the offsets of KafkaTableSources
[ https://issues.apache.org/jira/browse/FLINK-8118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264136#comment-16264136 ] ASF GitHub Bot commented on FLINK-8118: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5056#discussion_r152760280 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java --- @@ -121,6 +130,37 @@ public String getProctimeAttribute() { return rowtimeAttributeDescriptors; } + /** +* Returns a version-specific Kafka consumer with the start position configured. +* +* @param topic Kafka topic to consume. +* @param propertiesProperties for the Kafka consumer. +* @param deserializationSchema Deserialization schema to use for Kafka records. +* @return The version-specific Kafka consumer +*/ + public FlinkKafkaConsumerBase getKafkaConsumer( --- End diff -- Should be `protected` > Allow to specify the offsets of KafkaTableSources > - > > Key: FLINK-8118 > URL: https://issues.apache.org/jira/browse/FLINK-8118 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Timo Walther >Assignee: Xingcan Cui > > Right now the Kafka TableSources can only read from the current group offset. > We should expose the possibilities of the Kafka Consumer: > https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-consumers-start-position-configuration -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8118) Allow to specify the offsets of KafkaTableSources
[ https://issues.apache.org/jira/browse/FLINK-8118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264140#comment-16264140 ] ASF GitHub Bot commented on FLINK-8118: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5056#discussion_r152763593 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java --- @@ -309,6 +356,51 @@ public B withKafkaTimestampAsRowtimeAttribute( return builder(); } + /** +* Configures the TableSource to start reading from the earliest offset for all partitions. +* +* @see FlinkKafkaConsumerBase#setStartFromEarliest() +*/ + public B startReadingFromEarliest() { + this.startupMode = StartupMode.EARLIEST; + this.specificStartupOffsets = null; + return builder(); + } + + /** +* Configures the TableSource to start reading from the latest offset for all partitions. +* +* @see FlinkKafkaConsumerBase#setStartFromLatest() +*/ + public B startReadingFromLatest() { --- End diff -- shorten to `fromLatestOffsets`? > Allow to specify the offsets of KafkaTableSources > - > > Key: FLINK-8118 > URL: https://issues.apache.org/jira/browse/FLINK-8118 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Timo Walther >Assignee: Xingcan Cui > > Right now the Kafka TableSources can only read from the current group offset. > We should expose the possibilities of the Kafka Consumer: > https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-consumers-start-position-configuration -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8118) Allow to specify the offsets of KafkaTableSources
[ https://issues.apache.org/jira/browse/FLINK-8118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264141#comment-16264141 ] ASF GitHub Bot commented on FLINK-8118: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5056#discussion_r152763651 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java --- @@ -309,6 +356,51 @@ public B withKafkaTimestampAsRowtimeAttribute( return builder(); } + /** +* Configures the TableSource to start reading from the earliest offset for all partitions. +* +* @see FlinkKafkaConsumerBase#setStartFromEarliest() +*/ + public B startReadingFromEarliest() { + this.startupMode = StartupMode.EARLIEST; + this.specificStartupOffsets = null; + return builder(); + } + + /** +* Configures the TableSource to start reading from the latest offset for all partitions. +* +* @see FlinkKafkaConsumerBase#setStartFromLatest() +*/ + public B startReadingFromLatest() { + this.startupMode = StartupMode.LATEST; + this.specificStartupOffsets = null; + return builder(); + } + + /** +* Configures the TableSource to start reading from any committed group offsets found in Zookeeper / Kafka brokers. +* +* @see FlinkKafkaConsumerBase#setStartFromGroupOffsets() +*/ + public B startReadingFromGroupOffsets() { --- End diff -- shorten to `fromGroupOffsets`? > Allow to specify the offsets of KafkaTableSources > - > > Key: FLINK-8118 > URL: https://issues.apache.org/jira/browse/FLINK-8118 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Timo Walther >Assignee: Xingcan Cui > > Right now the Kafka TableSources can only read from the current group offset. > We should expose the possibilities of the Kafka Consumer: > https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-consumers-start-position-configuration -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8118) Allow to specify the offsets of KafkaTableSources
[ https://issues.apache.org/jira/browse/FLINK-8118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264137#comment-16264137 ] ASF GitHub Bot commented on FLINK-8118: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5056#discussion_r152763535 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java --- @@ -309,6 +356,51 @@ public B withKafkaTimestampAsRowtimeAttribute( return builder(); } + /** +* Configures the TableSource to start reading from the earliest offset for all partitions. +* +* @see FlinkKafkaConsumerBase#setStartFromEarliest() +*/ + public B startReadingFromEarliest() { --- End diff -- shorten to `fromEarliestOffsets`? > Allow to specify the offsets of KafkaTableSources > - > > Key: FLINK-8118 > URL: https://issues.apache.org/jira/browse/FLINK-8118 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Timo Walther >Assignee: Xingcan Cui > > Right now the Kafka TableSources can only read from the current group offset. > We should expose the possibilities of the Kafka Consumer: > https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-consumers-start-position-configuration -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8118) Allow to specify the offsets of KafkaTableSources
[ https://issues.apache.org/jira/browse/FLINK-8118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264138#comment-16264138 ] ASF GitHub Bot commented on FLINK-8118: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5056#discussion_r152762409 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java --- @@ -68,6 +71,12 @@ /** Descriptor for a rowtime attribute. */ private List rowtimeAttributeDescriptors; + /** The startup mode for the contained consumer (default is {@link StartupMode#GROUP_OFFSETS}). */ + protected StartupMode startupMode; --- End diff -- should be `private`. We use `protected` setter methods to set optional parameters from the builder (see `proctimeAttribute`). > Allow to specify the offsets of KafkaTableSources > - > > Key: FLINK-8118 > URL: https://issues.apache.org/jira/browse/FLINK-8118 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Timo Walther >Assignee: Xingcan Cui > > Right now the Kafka TableSources can only read from the current group offset. > We should expose the possibilities of the Kafka Consumer: > https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-consumers-start-position-configuration -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8118) Allow to specify the offsets of KafkaTableSources
[ https://issues.apache.org/jira/browse/FLINK-8118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264040#comment-16264040 ] ASF GitHub Bot commented on FLINK-8118: --- GitHub user xccui opened a pull request: https://github.com/apache/flink/pull/5056 [FLINK-8118] [table] Allow specifying reading offsets of KafkaTableSources ## What is the purpose of the change This PR enables the kafka table source builder to specify start reading offsets for the `KafkaTableSource`. ## Brief change log - Add offset specifying methods (reading from latest, earliest, group offsets and specified offsets) to the builder. - Configure the offset option in a new `getKafkaConsumer` and change the old `getKafkaConsumer` to `createKafkaConsumer`. - Add a related test and update the documents. ## Verifying this change The change can be verified by `KafkaTableSourceTestBase. testKafkaTSSetConsumeOffsets`. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (docs) You can merge this pull request into a Git repository by running: $ git pull https://github.com/xccui/flink FLINK-8118 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5056.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5056 commit 3fd22f85af170f20de4126abd5182abf29970ca8 Author: Xingcan CuiDate: 2017-11-22T16:00:39Z [FLINK-8118][table]Allow to specify the offsets of KafkaTableSources > Allow to specify the offsets of KafkaTableSources > - > > Key: FLINK-8118 > URL: https://issues.apache.org/jira/browse/FLINK-8118 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Timo Walther >Assignee: Xingcan Cui > > Right now the Kafka TableSources can only read from the current group offset. > We should expose the possibilities of the Kafka Consumer: > https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-consumers-start-position-configuration -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8118) Allow to specify the offsets of KafkaTableSources
[ https://issues.apache.org/jira/browse/FLINK-8118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16262563#comment-16262563 ] Xingcan Cui commented on FLINK-8118: Hi [~fhueske], thanks for the suggestions! Since the existing configurations in {{KafkaTableSource.Builder}} are all about the {{TableSource}} itself while the starting offsets are set for the inner {{FlinkKafkaConsumerBase}}, the code may be a little verbose. Anyway, I'll create a PR soon and let's discuss that later. Thanks, Xingcan > Allow to specify the offsets of KafkaTableSources > - > > Key: FLINK-8118 > URL: https://issues.apache.org/jira/browse/FLINK-8118 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Timo Walther >Assignee: Xingcan Cui > > Right now the Kafka TableSources can only read from the current group offset. > We should expose the possibilities of the Kafka Consumer: > https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-consumers-start-position-configuration -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8118) Allow to specify the offsets of KafkaTableSources
[ https://issues.apache.org/jira/browse/FLINK-8118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16262211#comment-16262211 ] Fabian Hueske commented on FLINK-8118: -- Hi [~xccui], 1. I think offsets should be configured via the builder as an optional parameters. IMO, we should not split the configuration into a builder part and a non-builder part. 2. For testing, you can use Mockito to replace the {{FlinkConsumer}} which is returned from {{getKafkaConsumer}} by a mock and check if the correct method to configure the offsets is called. I did something similar to test the configuration of the Orc readers in the [OrcRowInputFormatTest|https://github.com/fhueske/flink/blob/d80506e3785268f541457a69ade3118c634cf7e7/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcRowInputFormatTest.java]. Best, Fabian > Allow to specify the offsets of KafkaTableSources > - > > Key: FLINK-8118 > URL: https://issues.apache.org/jira/browse/FLINK-8118 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Timo Walther >Assignee: Xingcan Cui > > Right now the Kafka TableSources can only read from the current group offset. > We should expose the possibilities of the Kafka Consumer: > https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-consumers-start-position-configuration -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8118) Allow to specify the offsets of KafkaTableSources
[ https://issues.apache.org/jira/browse/FLINK-8118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16261951#comment-16261951 ] Xingcan Cui commented on FLINK-8118: Hi [~twalthr], thanks for raising this. I got two questions about this issue. # Shall we integrate the methods into the {{KafkaTableSource.Builder}} or directly into the {{KafkaTableSource}}? Personally, I prefer the later one since it will be more flexible, while that seems to break the design pattern to some extent. # Since the {{startupMode}} in {{FlinkKafkaConsumerBase}} is invisible from outer classes, do you have some suggestions on testing this? Thanks, Xingcan > Allow to specify the offsets of KafkaTableSources > - > > Key: FLINK-8118 > URL: https://issues.apache.org/jira/browse/FLINK-8118 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Timo Walther >Assignee: Xingcan Cui > > Right now the Kafka TableSources can only read from the current group offset. > We should expose the possibilities of the Kafka Consumer: > https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-consumers-start-position-configuration -- This message was sent by Atlassian JIRA (v6.4.14#64029)