[jira] [Commented] (FLINK-8118) Allow to specify the offsets of KafkaTableSources

2017-11-23 Thread Timo Walther (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264505#comment-16264505
 ] 

Timo Walther commented on FLINK-8118:
-

Fixed in 1.4: 2fb24581a1775084e3be8c2575c129d250f39313 & 
13631b9617d32e46eba51c9125019ec5e77c39f3

> Allow to specify the offsets of KafkaTableSources
> -
>
> Key: FLINK-8118
> URL: https://issues.apache.org/jira/browse/FLINK-8118
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Timo Walther
>Assignee: Xingcan Cui
> Fix For: 1.4.0, 1.5.0
>
>
> Right now the Kafka TableSources can only read from the current group offset. 
> We should expose the possibilities of the Kafka Consumer:
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-consumers-start-position-configuration



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8118) Allow to specify the offsets of KafkaTableSources

2017-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264354#comment-16264354
 ] 

ASF GitHub Bot commented on FLINK-8118:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5056


> Allow to specify the offsets of KafkaTableSources
> -
>
> Key: FLINK-8118
> URL: https://issues.apache.org/jira/browse/FLINK-8118
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Timo Walther
>Assignee: Xingcan Cui
> Fix For: 1.4.0
>
>
> Right now the Kafka TableSources can only read from the current group offset. 
> We should expose the possibilities of the Kafka Consumer:
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-consumers-start-position-configuration



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8118) Allow to specify the offsets of KafkaTableSources

2017-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264310#comment-16264310
 ] 

ASF GitHub Bot commented on FLINK-8118:
---

Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/5056#discussion_r152797724
  
--- Diff: docs/dev/table/sourceSinks.md ---
@@ -264,6 +288,30 @@ val source: TableSource[_] = 
Kafka010AvroTableSource.builder()
 
 
 
+* **Specify the start reading position:** By default, the table source 
will start reading data from the committed group offsets in Zookeeper or Kafka 
brokers. You can specify other start positions via the builder's methods, which 
correspond to the configurations in section [Kafka Consumers Start Position 
Configuration](../connectors/kafka.html#kafka-consumers-start-position-configuration).
+
+
+
+{% highlight java %}
+TableSource source = Kafka010JsonTableSource.builder()
--- End diff --

Oops, sorry about that and thanks for pointing it out.


> Allow to specify the offsets of KafkaTableSources
> -
>
> Key: FLINK-8118
> URL: https://issues.apache.org/jira/browse/FLINK-8118
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>
> Right now the Kafka TableSources can only read from the current group offset. 
> We should expose the possibilities of the Kafka Consumer:
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-consumers-start-position-configuration



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8118) Allow to specify the offsets of KafkaTableSources

2017-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264305#comment-16264305
 ] 

ASF GitHub Bot commented on FLINK-8118:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5056#discussion_r152795641
  
--- Diff: docs/dev/table/sourceSinks.md ---
@@ -264,6 +288,30 @@ val source: TableSource[_] = 
Kafka010AvroTableSource.builder()
 
 
 
+* **Specify the start reading position:** By default, the table source 
will start reading data from the committed group offsets in Zookeeper or Kafka 
brokers. You can specify other start positions via the builder's methods, which 
correspond to the configurations in section [Kafka Consumers Start Position 
Configuration](../connectors/kafka.html#kafka-consumers-start-position-configuration).
+
+
+
+{% highlight java %}
+TableSource source = Kafka010JsonTableSource.builder()
--- End diff --

This should be Kafka010AvroTableSource.


> Allow to specify the offsets of KafkaTableSources
> -
>
> Key: FLINK-8118
> URL: https://issues.apache.org/jira/browse/FLINK-8118
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>
> Right now the Kafka TableSources can only read from the current group offset. 
> We should expose the possibilities of the Kafka Consumer:
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-consumers-start-position-configuration



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8118) Allow to specify the offsets of KafkaTableSources

2017-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264257#comment-16264257
 ] 

ASF GitHub Bot commented on FLINK-8118:
---

Github user xccui commented on the issue:

https://github.com/apache/flink/pull/5056
  
Hi @fhueske, thanks for the review! The PR has been updated according to 
your comments.

Thanks, Xingcan


> Allow to specify the offsets of KafkaTableSources
> -
>
> Key: FLINK-8118
> URL: https://issues.apache.org/jira/browse/FLINK-8118
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>
> Right now the Kafka TableSources can only read from the current group offset. 
> We should expose the possibilities of the Kafka Consumer:
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-consumers-start-position-configuration



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8118) Allow to specify the offsets of KafkaTableSources

2017-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264139#comment-16264139
 ] 

ASF GitHub Bot commented on FLINK-8118:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5056#discussion_r152763735
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
 ---
@@ -309,6 +356,51 @@ public B withKafkaTimestampAsRowtimeAttribute(
return builder();
}
 
+   /**
+* Configures the TableSource to start reading from the 
earliest offset for all partitions.
+*
+* @see FlinkKafkaConsumerBase#setStartFromEarliest()
+*/
+   public B startReadingFromEarliest() {
+   this.startupMode = StartupMode.EARLIEST;
+   this.specificStartupOffsets = null;
+   return builder();
+   }
+
+   /**
+* Configures the TableSource to start reading from the latest 
offset for all partitions.
+*
+* @see FlinkKafkaConsumerBase#setStartFromLatest()
+*/
+   public B startReadingFromLatest() {
+   this.startupMode = StartupMode.LATEST;
+   this.specificStartupOffsets = null;
+   return builder();
+   }
+
+   /**
+* Configures the TableSource to start reading from any 
committed group offsets found in Zookeeper / Kafka brokers.
+*
+* @see FlinkKafkaConsumerBase#setStartFromGroupOffsets()
+*/
+   public B startReadingFromGroupOffsets() {
+   this.startupMode = StartupMode.GROUP_OFFSETS;
+   this.specificStartupOffsets = null;
+   return builder();
+   }
+
+   /**
+* Configures the TableSource to start reading partitions from 
specific offsets, set independently for each partition.
+*
+* @param specificStartupOffsets the specified offsets for 
partitions
+* @see FlinkKafkaConsumerBase#setStartFromSpecificOffsets(Map)
+*/
+   public B 
startReadingFromSpecificOffsets(Map 
specificStartupOffsets) {
--- End diff --

shorten to `fromSpecificOffsets`?


> Allow to specify the offsets of KafkaTableSources
> -
>
> Key: FLINK-8118
> URL: https://issues.apache.org/jira/browse/FLINK-8118
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>
> Right now the Kafka TableSources can only read from the current group offset. 
> We should expose the possibilities of the Kafka Consumer:
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-consumers-start-position-configuration



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8118) Allow to specify the offsets of KafkaTableSources

2017-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264142#comment-16264142
 ] 

ASF GitHub Bot commented on FLINK-8118:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5056#discussion_r152761700
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
 ---
@@ -163,14 +203,14 @@ protected void 
setRowtimeAttributeDescriptors(List r
 ABSTRACT METHODS FOR SUBCLASSES
 
/**
-* Returns the version-specific Kafka consumer.
+* Create a version-specific Kafka consumer.
--- End diff --

Create -> Creates


> Allow to specify the offsets of KafkaTableSources
> -
>
> Key: FLINK-8118
> URL: https://issues.apache.org/jira/browse/FLINK-8118
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>
> Right now the Kafka TableSources can only read from the current group offset. 
> We should expose the possibilities of the Kafka Consumer:
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-consumers-start-position-configuration



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8118) Allow to specify the offsets of KafkaTableSources

2017-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264136#comment-16264136
 ] 

ASF GitHub Bot commented on FLINK-8118:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5056#discussion_r152760280
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
 ---
@@ -121,6 +130,37 @@ public String getProctimeAttribute() {
return rowtimeAttributeDescriptors;
}
 
+   /**
+* Returns a version-specific Kafka consumer with the start position 
configured.
+*
+* @param topic Kafka topic to consume.
+* @param propertiesProperties for the Kafka consumer.
+* @param deserializationSchema Deserialization schema to use for Kafka 
records.
+* @return The version-specific Kafka consumer
+*/
+   public FlinkKafkaConsumerBase getKafkaConsumer(
--- End diff --

Should be `protected`


> Allow to specify the offsets of KafkaTableSources
> -
>
> Key: FLINK-8118
> URL: https://issues.apache.org/jira/browse/FLINK-8118
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>
> Right now the Kafka TableSources can only read from the current group offset. 
> We should expose the possibilities of the Kafka Consumer:
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-consumers-start-position-configuration



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8118) Allow to specify the offsets of KafkaTableSources

2017-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264140#comment-16264140
 ] 

ASF GitHub Bot commented on FLINK-8118:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5056#discussion_r152763593
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
 ---
@@ -309,6 +356,51 @@ public B withKafkaTimestampAsRowtimeAttribute(
return builder();
}
 
+   /**
+* Configures the TableSource to start reading from the 
earliest offset for all partitions.
+*
+* @see FlinkKafkaConsumerBase#setStartFromEarliest()
+*/
+   public B startReadingFromEarliest() {
+   this.startupMode = StartupMode.EARLIEST;
+   this.specificStartupOffsets = null;
+   return builder();
+   }
+
+   /**
+* Configures the TableSource to start reading from the latest 
offset for all partitions.
+*
+* @see FlinkKafkaConsumerBase#setStartFromLatest()
+*/
+   public B startReadingFromLatest() {
--- End diff --

shorten to `fromLatestOffsets`?


> Allow to specify the offsets of KafkaTableSources
> -
>
> Key: FLINK-8118
> URL: https://issues.apache.org/jira/browse/FLINK-8118
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>
> Right now the Kafka TableSources can only read from the current group offset. 
> We should expose the possibilities of the Kafka Consumer:
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-consumers-start-position-configuration



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8118) Allow to specify the offsets of KafkaTableSources

2017-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264141#comment-16264141
 ] 

ASF GitHub Bot commented on FLINK-8118:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5056#discussion_r152763651
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
 ---
@@ -309,6 +356,51 @@ public B withKafkaTimestampAsRowtimeAttribute(
return builder();
}
 
+   /**
+* Configures the TableSource to start reading from the 
earliest offset for all partitions.
+*
+* @see FlinkKafkaConsumerBase#setStartFromEarliest()
+*/
+   public B startReadingFromEarliest() {
+   this.startupMode = StartupMode.EARLIEST;
+   this.specificStartupOffsets = null;
+   return builder();
+   }
+
+   /**
+* Configures the TableSource to start reading from the latest 
offset for all partitions.
+*
+* @see FlinkKafkaConsumerBase#setStartFromLatest()
+*/
+   public B startReadingFromLatest() {
+   this.startupMode = StartupMode.LATEST;
+   this.specificStartupOffsets = null;
+   return builder();
+   }
+
+   /**
+* Configures the TableSource to start reading from any 
committed group offsets found in Zookeeper / Kafka brokers.
+*
+* @see FlinkKafkaConsumerBase#setStartFromGroupOffsets()
+*/
+   public B startReadingFromGroupOffsets() {
--- End diff --

shorten to `fromGroupOffsets`?


> Allow to specify the offsets of KafkaTableSources
> -
>
> Key: FLINK-8118
> URL: https://issues.apache.org/jira/browse/FLINK-8118
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>
> Right now the Kafka TableSources can only read from the current group offset. 
> We should expose the possibilities of the Kafka Consumer:
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-consumers-start-position-configuration



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8118) Allow to specify the offsets of KafkaTableSources

2017-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264137#comment-16264137
 ] 

ASF GitHub Bot commented on FLINK-8118:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5056#discussion_r152763535
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
 ---
@@ -309,6 +356,51 @@ public B withKafkaTimestampAsRowtimeAttribute(
return builder();
}
 
+   /**
+* Configures the TableSource to start reading from the 
earliest offset for all partitions.
+*
+* @see FlinkKafkaConsumerBase#setStartFromEarliest()
+*/
+   public B startReadingFromEarliest() {
--- End diff --

shorten to `fromEarliestOffsets`?


> Allow to specify the offsets of KafkaTableSources
> -
>
> Key: FLINK-8118
> URL: https://issues.apache.org/jira/browse/FLINK-8118
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>
> Right now the Kafka TableSources can only read from the current group offset. 
> We should expose the possibilities of the Kafka Consumer:
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-consumers-start-position-configuration



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8118) Allow to specify the offsets of KafkaTableSources

2017-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264138#comment-16264138
 ] 

ASF GitHub Bot commented on FLINK-8118:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5056#discussion_r152762409
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
 ---
@@ -68,6 +71,12 @@
/** Descriptor for a rowtime attribute. */
private List rowtimeAttributeDescriptors;
 
+   /** The startup mode for the contained consumer (default is {@link 
StartupMode#GROUP_OFFSETS}). */
+   protected StartupMode startupMode;
--- End diff --

should be `private`. We use `protected` setter methods to set optional 
parameters from the builder (see `proctimeAttribute`).


> Allow to specify the offsets of KafkaTableSources
> -
>
> Key: FLINK-8118
> URL: https://issues.apache.org/jira/browse/FLINK-8118
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>
> Right now the Kafka TableSources can only read from the current group offset. 
> We should expose the possibilities of the Kafka Consumer:
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-consumers-start-position-configuration



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8118) Allow to specify the offsets of KafkaTableSources

2017-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264040#comment-16264040
 ] 

ASF GitHub Bot commented on FLINK-8118:
---

GitHub user xccui opened a pull request:

https://github.com/apache/flink/pull/5056

[FLINK-8118] [table] Allow specifying reading offsets of KafkaTableSources

## What is the purpose of the change

This PR enables the kafka table source builder to specify start reading 
offsets for the `KafkaTableSource`.

## Brief change log

  - Add offset specifying methods (reading from latest, earliest, group 
offsets and specified offsets) to the builder.
  - Configure the offset option in a new `getKafkaConsumer` and change the 
old `getKafkaConsumer` to `createKafkaConsumer`.
  - Add a related test and update the documents.


## Verifying this change

The change can be verified by `KafkaTableSourceTestBase. 
testKafkaTSSetConsumeOffsets`.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (yes)
  - If yes, how is the feature documented? (docs)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/xccui/flink FLINK-8118

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5056.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5056


commit 3fd22f85af170f20de4126abd5182abf29970ca8
Author: Xingcan Cui 
Date:   2017-11-22T16:00:39Z

[FLINK-8118][table]Allow to specify the offsets of KafkaTableSources




> Allow to specify the offsets of KafkaTableSources
> -
>
> Key: FLINK-8118
> URL: https://issues.apache.org/jira/browse/FLINK-8118
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>
> Right now the Kafka TableSources can only read from the current group offset. 
> We should expose the possibilities of the Kafka Consumer:
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-consumers-start-position-configuration



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8118) Allow to specify the offsets of KafkaTableSources

2017-11-22 Thread Xingcan Cui (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16262563#comment-16262563
 ] 

Xingcan Cui commented on FLINK-8118:


Hi [~fhueske], thanks for the suggestions! 

Since the existing configurations in {{KafkaTableSource.Builder}} are all about 
the {{TableSource}} itself while the starting offsets are set for the inner 
{{FlinkKafkaConsumerBase}}, the code may be a little verbose. Anyway, I'll 
create a PR soon and let's discuss that later.

Thanks, Xingcan

> Allow to specify the offsets of KafkaTableSources
> -
>
> Key: FLINK-8118
> URL: https://issues.apache.org/jira/browse/FLINK-8118
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>
> Right now the Kafka TableSources can only read from the current group offset. 
> We should expose the possibilities of the Kafka Consumer:
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-consumers-start-position-configuration



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8118) Allow to specify the offsets of KafkaTableSources

2017-11-22 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16262211#comment-16262211
 ] 

Fabian Hueske commented on FLINK-8118:
--

Hi [~xccui],
1. I think offsets should be configured via the builder as an optional 
parameters. IMO, we should not split the configuration into a builder part and 
a non-builder part.
2. For testing, you can use Mockito to replace the {{FlinkConsumer}} which is 
returned from {{getKafkaConsumer}} by a mock and check if the correct method to 
configure the offsets is called. I did something similar to test the 
configuration of the Orc readers in the 
[OrcRowInputFormatTest|https://github.com/fhueske/flink/blob/d80506e3785268f541457a69ade3118c634cf7e7/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcRowInputFormatTest.java].

Best, Fabian

> Allow to specify the offsets of KafkaTableSources
> -
>
> Key: FLINK-8118
> URL: https://issues.apache.org/jira/browse/FLINK-8118
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>
> Right now the Kafka TableSources can only read from the current group offset. 
> We should expose the possibilities of the Kafka Consumer:
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-consumers-start-position-configuration



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8118) Allow to specify the offsets of KafkaTableSources

2017-11-21 Thread Xingcan Cui (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16261951#comment-16261951
 ] 

Xingcan Cui commented on FLINK-8118:


Hi [~twalthr], thanks for raising this. I got two questions about this issue.
# Shall we integrate the methods into the {{KafkaTableSource.Builder}} or 
directly into the {{KafkaTableSource}}? Personally, I prefer the later one 
since it will be more flexible, while that seems to break the design pattern to 
some extent.
# Since the {{startupMode}} in {{FlinkKafkaConsumerBase}} is invisible from 
outer classes, do you have some suggestions on testing this?

Thanks, Xingcan

> Allow to specify the offsets of KafkaTableSources
> -
>
> Key: FLINK-8118
> URL: https://issues.apache.org/jira/browse/FLINK-8118
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>
> Right now the Kafka TableSources can only read from the current group offset. 
> We should expose the possibilities of the Kafka Consumer:
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-consumers-start-position-configuration



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)