[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset

2018-05-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16470268#comment-16470268
 ] 

ASF GitHub Bot commented on FLINK-6352:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3915


> FlinkKafkaConsumer should support to use timestamp to set up start offset
> -
>
> Key: FLINK-6352
> URL: https://issues.apache.org/jira/browse/FLINK-6352
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Fang Yong
>Assignee: Fang Yong
>Priority: Blocker
> Fix For: 1.5.0
>
>
> Currently "auto.offset.reset" is used to initialize the start offset of 
> FlinkKafkaConsumer, and the value should be earliest/latest/none. This method 
> can only let the job comsume the beginning or the most recent data, but can 
> not specify the specific offset of Kafka began to consume. 
> So, there should be a configuration item (such as 
> "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that 
> allows user to configure the initial offset of Kafka. The action of 
> "flink.source.start.time" is as follows:
> 1) job start from checkpoint / savepoint
>   a> offset of partition can be restored from checkpoint/savepoint,  
> "flink.source.start.time" will be ignored.
>   b> there's no checkpoint/savepoint for the partition (For example, this 
> partition is newly increased), the "flink.kafka.start.time" will be used to 
> initialize the offset of the partition
> 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used 
> to initialize the offset of the kafka
>   a> the "flink.source.start.time" is valid, use it to set the offset of kafka
>   b> the "flink.source.start.time" is out-of-range, the same as it does 
> currently with no initial offset, get kafka's current offset and start reading



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset

2018-03-01 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16383258#comment-16383258
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-6352:


Merged.

1.6.0: f8ca273549aded00c7cd12699cebc1f5bba83153
1.5.0: 1fcd516a0c55f22d06b4ce3d9bc37fb9d03f0e33

> FlinkKafkaConsumer should support to use timestamp to set up start offset
> -
>
> Key: FLINK-6352
> URL: https://issues.apache.org/jira/browse/FLINK-6352
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Fang Yong
>Assignee: Fang Yong
>Priority: Blocker
> Fix For: 1.5.0
>
>
> Currently "auto.offset.reset" is used to initialize the start offset of 
> FlinkKafkaConsumer, and the value should be earliest/latest/none. This method 
> can only let the job comsume the beginning or the most recent data, but can 
> not specify the specific offset of Kafka began to consume. 
> So, there should be a configuration item (such as 
> "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that 
> allows user to configure the initial offset of Kafka. The action of 
> "flink.source.start.time" is as follows:
> 1) job start from checkpoint / savepoint
>   a> offset of partition can be restored from checkpoint/savepoint,  
> "flink.source.start.time" will be ignored.
>   b> there's no checkpoint/savepoint for the partition (For example, this 
> partition is newly increased), the "flink.kafka.start.time" will be used to 
> initialize the offset of the partition
> 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used 
> to initialize the offset of the kafka
>   a> the "flink.source.start.time" is valid, use it to set the offset of kafka
>   b> the "flink.source.start.time" is out-of-range, the same as it does 
> currently with no initial offset, get kafka's current offset and start reading



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset

2018-03-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16383255#comment-16383255
 ] 

ASF GitHub Bot commented on FLINK-6352:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5282


> FlinkKafkaConsumer should support to use timestamp to set up start offset
> -
>
> Key: FLINK-6352
> URL: https://issues.apache.org/jira/browse/FLINK-6352
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Fang Yong
>Assignee: Fang Yong
>Priority: Blocker
> Fix For: 1.5.0
>
>
> Currently "auto.offset.reset" is used to initialize the start offset of 
> FlinkKafkaConsumer, and the value should be earliest/latest/none. This method 
> can only let the job comsume the beginning or the most recent data, but can 
> not specify the specific offset of Kafka began to consume. 
> So, there should be a configuration item (such as 
> "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that 
> allows user to configure the initial offset of Kafka. The action of 
> "flink.source.start.time" is as follows:
> 1) job start from checkpoint / savepoint
>   a> offset of partition can be restored from checkpoint/savepoint,  
> "flink.source.start.time" will be ignored.
>   b> there's no checkpoint/savepoint for the partition (For example, this 
> partition is newly increased), the "flink.kafka.start.time" will be used to 
> initialize the offset of the partition
> 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used 
> to initialize the offset of the kafka
>   a> the "flink.source.start.time" is valid, use it to set the offset of kafka
>   b> the "flink.source.start.time" is out-of-range, the same as it does 
> currently with no initial offset, get kafka's current offset and start reading



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset

2018-03-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16381730#comment-16381730
 ] 

ASF GitHub Bot commented on FLINK-6352:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5282
  
Thanks for the review @aljoscha!

I'll proceed to merge this (while addressing your last comment) to `master` 
and `release-1.5`.


> FlinkKafkaConsumer should support to use timestamp to set up start offset
> -
>
> Key: FLINK-6352
> URL: https://issues.apache.org/jira/browse/FLINK-6352
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Fang Yong
>Assignee: Fang Yong
>Priority: Blocker
> Fix For: 1.5.0
>
>
> Currently "auto.offset.reset" is used to initialize the start offset of 
> FlinkKafkaConsumer, and the value should be earliest/latest/none. This method 
> can only let the job comsume the beginning or the most recent data, but can 
> not specify the specific offset of Kafka began to consume. 
> So, there should be a configuration item (such as 
> "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that 
> allows user to configure the initial offset of Kafka. The action of 
> "flink.source.start.time" is as follows:
> 1) job start from checkpoint / savepoint
>   a> offset of partition can be restored from checkpoint/savepoint,  
> "flink.source.start.time" will be ignored.
>   b> there's no checkpoint/savepoint for the partition (For example, this 
> partition is newly increased), the "flink.kafka.start.time" will be used to 
> initialize the offset of the partition
> 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used 
> to initialize the offset of the kafka
>   a> the "flink.source.start.time" is valid, use it to set the offset of kafka
>   b> the "flink.source.start.time" is out-of-range, the same as it does 
> currently with no initial offset, get kafka's current offset and start reading



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset

2018-02-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16378802#comment-16378802
 ] 

ASF GitHub Bot commented on FLINK-6352:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5282#discussion_r170966200
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
 ---
@@ -621,12 +621,70 @@ public void runStartFromSpecificOffsets() throws 
Exception {
partitionsToValueCountAndStartOffsets.put(2, new Tuple2<>(28, 
22)); // partition 2 should read offset 22-49
partitionsToValueCountAndStartOffsets.put(3, new Tuple2<>(50, 
0));  // partition 3 should read offset 0-49
 
-   readSequence(env, StartupMode.SPECIFIC_OFFSETS, 
specificStartupOffsets, readProps, topicName, 
partitionsToValueCountAndStartOffsets);
+   readSequence(env, StartupMode.SPECIFIC_OFFSETS, 
specificStartupOffsets, null, readProps, topicName, 
partitionsToValueCountAndStartOffsets);
 
kafkaOffsetHandler.close();
deleteTestTopic(topicName);
}
 
+   /**
+* This test ensures that the consumer correctly uses user-supplied 
timestamp when explicitly configured to
+* start from timestamp.
+*
+* The validated Kafka data is written in 2 steps: first, an initial 
50 records is written to each partition.
+* After that, another 30 records is appended to each partition. Before 
each step, a timestamp is recorded.
+* For the validation, when the read job is configured to start from 
the first timestamp, each partition should start
+* from offset 0 and read a total of 80 records. When configured to 
start from the second timestamp,
+* each partition should start from offset 50 and read on the remaining 
30 appended records.
+*/
+   public void runStartFromTimestamp() throws Exception {
+   // 4 partitions with 50 records each
+   final int parallelism = 4;
+   final int initialRecordsInEachPartition = 50;
+   final int appendRecordsInEachPartition = 30;
+
+   long firstTimestamp = 0;
+   long secondTimestamp = 0;
+   String topic = "";
+
+   // attempt to create an appended test sequence, where the 
timestamp of writing the appended sequence
+   // is assured to be larger than the timestamp of the original 
sequence.
+   final int maxRetries = 3;
+   int attempt = 0;
+   while (attempt != maxRetries) {
+   firstTimestamp = System.currentTimeMillis();
+   topic = writeSequence("runStartFromTimestamp", 
initialRecordsInEachPartition, parallelism, 1);
--- End diff --

Ah, I just thought that we could have a simple loop there:

```
long secondTimestamp = System.currentTimeMillis();
while (secondTimestamp <= firstTimestamp) {
  Thread.sleep();
  secondTimestamp = System.currentTimeMillis();
}
```
what do you think?


> FlinkKafkaConsumer should support to use timestamp to set up start offset
> -
>
> Key: FLINK-6352
> URL: https://issues.apache.org/jira/browse/FLINK-6352
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Fang Yong
>Assignee: Fang Yong
>Priority: Blocker
> Fix For: 1.5.0
>
>
> Currently "auto.offset.reset" is used to initialize the start offset of 
> FlinkKafkaConsumer, and the value should be earliest/latest/none. This method 
> can only let the job comsume the beginning or the most recent data, but can 
> not specify the specific offset of Kafka began to consume. 
> So, there should be a configuration item (such as 
> "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that 
> allows user to configure the initial offset of Kafka. The action of 
> "flink.source.start.time" is as follows:
> 1) job start from checkpoint / savepoint
>   a> offset of partition can be restored from checkpoint/savepoint,  
> "flink.source.start.time" will be ignored.
>   b> there's no checkpoint/savepoint for the partition (For example, this 
> partition is newly increased), the "flink.kafka.start.time" will be used to 
> initialize the offset of the partition
> 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used 
> to initialize the offset of the kafka
>   a> the "flink.source.start.time" is valid, use it to set the offset of kafka
>   b> the "flink.source.start.time" is out-of-range, the same as it does 
> currently with no 

[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset

2018-02-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16378162#comment-16378162
 ] 

ASF GitHub Bot commented on FLINK-6352:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5282
  
@aljoscha sorry about the leftover merge markers, I've fixed them now.


> FlinkKafkaConsumer should support to use timestamp to set up start offset
> -
>
> Key: FLINK-6352
> URL: https://issues.apache.org/jira/browse/FLINK-6352
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Fang Yong
>Assignee: Fang Yong
>Priority: Blocker
> Fix For: 1.5.0
>
>
> Currently "auto.offset.reset" is used to initialize the start offset of 
> FlinkKafkaConsumer, and the value should be earliest/latest/none. This method 
> can only let the job comsume the beginning or the most recent data, but can 
> not specify the specific offset of Kafka began to consume. 
> So, there should be a configuration item (such as 
> "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that 
> allows user to configure the initial offset of Kafka. The action of 
> "flink.source.start.time" is as follows:
> 1) job start from checkpoint / savepoint
>   a> offset of partition can be restored from checkpoint/savepoint,  
> "flink.source.start.time" will be ignored.
>   b> there's no checkpoint/savepoint for the partition (For example, this 
> partition is newly increased), the "flink.kafka.start.time" will be used to 
> initialize the offset of the partition
> 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used 
> to initialize the offset of the kafka
>   a> the "flink.source.start.time" is valid, use it to set the offset of kafka
>   b> the "flink.source.start.time" is out-of-range, the same as it does 
> currently with no initial offset, get kafka's current offset and start reading



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset

2018-02-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16377178#comment-16377178
 ] 

ASF GitHub Bot commented on FLINK-6352:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5282#discussion_r170663674
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010FetcherTest.java
 ---
@@ -129,9 +129,14 @@ public Void answer(InvocationOnMock invocation) {
schema,
new Properties(),
0L,
+<<< HEAD
--- End diff --

Leftover merge markers?


> FlinkKafkaConsumer should support to use timestamp to set up start offset
> -
>
> Key: FLINK-6352
> URL: https://issues.apache.org/jira/browse/FLINK-6352
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Fang Yong
>Assignee: Fang Yong
>Priority: Blocker
> Fix For: 1.5.0
>
>
> Currently "auto.offset.reset" is used to initialize the start offset of 
> FlinkKafkaConsumer, and the value should be earliest/latest/none. This method 
> can only let the job comsume the beginning or the most recent data, but can 
> not specify the specific offset of Kafka began to consume. 
> So, there should be a configuration item (such as 
> "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that 
> allows user to configure the initial offset of Kafka. The action of 
> "flink.source.start.time" is as follows:
> 1) job start from checkpoint / savepoint
>   a> offset of partition can be restored from checkpoint/savepoint,  
> "flink.source.start.time" will be ignored.
>   b> there's no checkpoint/savepoint for the partition (For example, this 
> partition is newly increased), the "flink.kafka.start.time" will be used to 
> initialize the offset of the partition
> 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used 
> to initialize the offset of the kafka
>   a> the "flink.source.start.time" is valid, use it to set the offset of kafka
>   b> the "flink.source.start.time" is out-of-range, the same as it does 
> currently with no initial offset, get kafka's current offset and start reading



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset

2018-02-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16376818#comment-16376818
 ] 

ASF GitHub Bot commented on FLINK-6352:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5282
  
@aljoscha I've addressed your comments and rebased the PR. Please have 
another look when you find the time, thanks a lot.


> FlinkKafkaConsumer should support to use timestamp to set up start offset
> -
>
> Key: FLINK-6352
> URL: https://issues.apache.org/jira/browse/FLINK-6352
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Fang Yong
>Assignee: Fang Yong
>Priority: Blocker
> Fix For: 1.5.0
>
>
> Currently "auto.offset.reset" is used to initialize the start offset of 
> FlinkKafkaConsumer, and the value should be earliest/latest/none. This method 
> can only let the job comsume the beginning or the most recent data, but can 
> not specify the specific offset of Kafka began to consume. 
> So, there should be a configuration item (such as 
> "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that 
> allows user to configure the initial offset of Kafka. The action of 
> "flink.source.start.time" is as follows:
> 1) job start from checkpoint / savepoint
>   a> offset of partition can be restored from checkpoint/savepoint,  
> "flink.source.start.time" will be ignored.
>   b> there's no checkpoint/savepoint for the partition (For example, this 
> partition is newly increased), the "flink.kafka.start.time" will be used to 
> initialize the offset of the partition
> 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used 
> to initialize the offset of the kafka
>   a> the "flink.source.start.time" is valid, use it to set the offset of kafka
>   b> the "flink.source.start.time" is out-of-range, the same as it does 
> currently with no initial offset, get kafka's current offset and start reading



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset

2018-02-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16365270#comment-16365270
 ] 

ASF GitHub Bot commented on FLINK-6352:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/5282
  
Yes, that was my main concern. With a loop it could work, yes.  


> FlinkKafkaConsumer should support to use timestamp to set up start offset
> -
>
> Key: FLINK-6352
> URL: https://issues.apache.org/jira/browse/FLINK-6352
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Fang Yong
>Assignee: Fang Yong
>Priority: Blocker
> Fix For: 1.5.0
>
>
> Currently "auto.offset.reset" is used to initialize the start offset of 
> FlinkKafkaConsumer, and the value should be earliest/latest/none. This method 
> can only let the job comsume the beginning or the most recent data, but can 
> not specify the specific offset of Kafka began to consume. 
> So, there should be a configuration item (such as 
> "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that 
> allows user to configure the initial offset of Kafka. The action of 
> "flink.source.start.time" is as follows:
> 1) job start from checkpoint / savepoint
>   a> offset of partition can be restored from checkpoint/savepoint,  
> "flink.source.start.time" will be ignored.
>   b> there's no checkpoint/savepoint for the partition (For example, this 
> partition is newly increased), the "flink.kafka.start.time" will be used to 
> initialize the offset of the partition
> 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used 
> to initialize the offset of the kafka
>   a> the "flink.source.start.time" is valid, use it to set the offset of kafka
>   b> the "flink.source.start.time" is out-of-range, the same as it does 
> currently with no initial offset, get kafka's current offset and start reading



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset

2018-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16365137#comment-16365137
 ] 

ASF GitHub Bot commented on FLINK-6352:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5282
  
@aljoscha regarding the potential flakiness of the test you mentioned:
I think the test will be stable, as long as the recorded timestamp of the 
second run is larger than the first run. We can add a loop (with max retries) 
for the test topic generation, until that condition is met.

For the verification side (reading from Kafka), we'll essentially also be 
relying on Kafka to correctly return corrrect offsets for a given timestamp, 
but that is the case for almost all Kafka ITCases.

Am I missing any other potential flakiness aspects of this?


> FlinkKafkaConsumer should support to use timestamp to set up start offset
> -
>
> Key: FLINK-6352
> URL: https://issues.apache.org/jira/browse/FLINK-6352
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Fang Yong
>Assignee: Fang Yong
>Priority: Blocker
> Fix For: 1.5.0
>
>
> Currently "auto.offset.reset" is used to initialize the start offset of 
> FlinkKafkaConsumer, and the value should be earliest/latest/none. This method 
> can only let the job comsume the beginning or the most recent data, but can 
> not specify the specific offset of Kafka began to consume. 
> So, there should be a configuration item (such as 
> "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that 
> allows user to configure the initial offset of Kafka. The action of 
> "flink.source.start.time" is as follows:
> 1) job start from checkpoint / savepoint
>   a> offset of partition can be restored from checkpoint/savepoint,  
> "flink.source.start.time" will be ignored.
>   b> there's no checkpoint/savepoint for the partition (For example, this 
> partition is newly increased), the "flink.kafka.start.time" will be used to 
> initialize the offset of the partition
> 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used 
> to initialize the offset of the kafka
>   a> the "flink.source.start.time" is valid, use it to set the offset of kafka
>   b> the "flink.source.start.time" is out-of-range, the same as it does 
> currently with no initial offset, get kafka's current offset and start reading



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset

2018-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16365129#comment-16365129
 ] 

ASF GitHub Bot commented on FLINK-6352:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5282#discussion_r168377228
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
 ---
@@ -441,28 +481,57 @@ public void open(Configuration configuration) throws 
Exception {
getRuntimeContext().getIndexOfThisSubtask(), 
subscribedPartitionsToStartOffsets.size(), subscribedPartitionsToStartOffsets);
} else {
// use the partition discoverer to fetch the initial 
seed partitions,
-   // and set their initial offsets depending on the 
startup mode
-   for (KafkaTopicPartition seedPartition : allPartitions) 
{
-   if (startupMode != 
StartupMode.SPECIFIC_OFFSETS) {
-   
subscribedPartitionsToStartOffsets.put(seedPartition, 
startupMode.getStateSentinel());
-   } else {
+   // and set their initial offsets depending on the 
startup mode.
+   // for SPECIFIC_OFFSETS and TIMESTAMP modes, we set the 
specific offsets now;
+   // for other modes (EARLIEST, LATEST, and 
GROUP_OFFSETS), the offset is lazily determined
+   // when the partition is actually read.
+   switch (startupMode) {
+   case SPECIFIC_OFFSETS:
if (specificStartupOffsets == null) {
throw new 
IllegalArgumentException(
"Startup mode for the 
consumer set to " + StartupMode.SPECIFIC_OFFSETS +
-   ", but no 
specific offsets were specified");
+   ", but no 
specific offsets were specified.");
}
 
-   Long specificOffset = 
specificStartupOffsets.get(seedPartition);
-   if (specificOffset != null) {
-   // since the specified offsets 
represent the next record to read, we subtract
-   // it by one so that the 
initial state of the consumer will be correct
-   
subscribedPartitionsToStartOffsets.put(seedPartition, specificOffset - 1);
-   } else {
-   // default to group offset 
behaviour if the user-provided specific offsets
-   // do not contain a value for 
this partition
-   
subscribedPartitionsToStartOffsets.put(seedPartition, 
KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
+   for (KafkaTopicPartition seedPartition 
: allPartitions) {
+   Long specificOffset = 
specificStartupOffsets.get(seedPartition);
+   if (specificOffset != null) {
+   // since the specified 
offsets represent the next record to read, we subtract
+   // it by one so that 
the initial state of the consumer will be correct
+   
subscribedPartitionsToStartOffsets.put(seedPartition, specificOffset - 1);
+   } else {
+   // default to group 
offset behaviour if the user-provided specific offsets
+   // do not contain a 
value for this partition
+   
subscribedPartitionsToStartOffsets.put(seedPartition, 
KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
+   }
+   }
+
+   break;
+   case TIMESTAMP:
+   if (startupOffsetsTimestamp == null) {
+   throw new 
IllegalArgumentException(
--- End diff --

That makes sense, will change (including usage in existing code)


> FlinkKafkaConsumer should support to use timestamp to set up start offset
> 

[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset

2018-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16365130#comment-16365130
 ] 

ASF GitHub Bot commented on FLINK-6352:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5282#discussion_r168377242
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
 ---
@@ -1910,86 +1959,171 @@ public void cancel() {
 

JobManagerCommunicationUtils.waitUntilNoJobIsRunning(flink.getLeaderGateway(timeout));
 
-   final StreamExecutionEnvironment readEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
-   
readEnv.getConfig().setRestartStrategy(RestartStrategies.noRestart());
-   readEnv.getConfig().disableSysoutLogging();
-   readEnv.setParallelism(parallelism);
+   if (validateSequence(topicName, parallelism, 
deserSchema, numElements)) {
+   // everything is good!
+   return topicName;
+   }
+   else {
+   deleteTestTopic(topicName);
+   // fall through the loop
+   }
+   }
 
-   Properties readProps = (Properties) 
standardProps.clone();
-   readProps.setProperty("group.id", 
"flink-tests-validator");
-   readProps.putAll(secureProps);
-   FlinkKafkaConsumerBase> 
consumer = kafkaServer.getConsumer(topicName, deserSchema, readProps);
+   throw new Exception("Could not write a valid sequence to Kafka 
after " + maxNumAttempts + " attempts");
+   }
 
-   readEnv
-   .addSource(consumer)
-   .map(new 
RichMapFunction, Tuple2>() {
+   protected void writeAppendSequence(
+   String topicName,
+   final int originalNumElements,
+   final int numElementsToAppend,
+   final int parallelism) throws Exception {
 
-   private final int totalCount = 
parallelism * numElements;
-   private int count = 0;
+   LOG.info("\n===\n" +
+   "== Appending sequence of " + numElementsToAppend + " 
into " + topicName +
+   "===");
 
-   @Override
-   public Tuple2 
map(Tuple2 value) throws Exception {
-   if (++count == 
totalCount) {
-   throw new 
SuccessException();
-   } else {
-   return value;
-   }
-   }
-   }).setParallelism(1)
-   .addSink(new 
DiscardingSink>()).setParallelism(1);
+   final TypeInformation> resultType =
+   TypeInformation.of(new TypeHint>() {});
 
-   final AtomicReference errorRef = new 
AtomicReference<>();
+   final KeyedSerializationSchema> 
serSchema =
+   new KeyedSerializationSchemaWrapper<>(
+   new 
TypeInformationSerializationSchema<>(resultType, new ExecutionConfig()));
 
-   Thread runner = new Thread() {
-   @Override
-   public void run() {
-   try {
-   tryExecute(readEnv, "sequence 
validation");
-   } catch (Throwable t) {
-   errorRef.set(t);
-   }
+   final KeyedDeserializationSchema> 
deserSchema =
+   new KeyedDeserializationSchemaWrapper<>(
+   new 
TypeInformationSerializationSchema<>(resultType, new ExecutionConfig()));
+
+   //  Write the append sequence 

[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset

2018-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16365125#comment-16365125
 ] 

ASF GitHub Bot commented on FLINK-6352:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5282#discussion_r168377156
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
 ---
@@ -343,6 +348,39 @@ public FlinkKafkaConsumerBase(
 */
public FlinkKafkaConsumerBase setStartFromLatest() {
this.startupMode = StartupMode.LATEST;
+   this.startupOffsetsTimestamp = null;
+   this.specificStartupOffsets = null;
+   return this;
+   }
+
+   /**
+* Specifies the consumer to start reading partitions from a specified 
timestamp.
+* The specified timestamp must be before the current timestamp.
+* This lets the consumer ignore any committed group offsets in 
Zookeeper / Kafka brokers.
+*
+* The consumer will look up the earliest offset whose timestamp is 
greater than or equal
+* to the specific timestamp from Kafka. If there's no such offset, the 
consumer will use the
+* latest offset to read data from kafka.
+*
+* This method does not effect where partitions are read from when 
the consumer is restored
--- End diff --

Will fix.


> FlinkKafkaConsumer should support to use timestamp to set up start offset
> -
>
> Key: FLINK-6352
> URL: https://issues.apache.org/jira/browse/FLINK-6352
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Fang Yong
>Assignee: Fang Yong
>Priority: Blocker
> Fix For: 1.5.0
>
>
> Currently "auto.offset.reset" is used to initialize the start offset of 
> FlinkKafkaConsumer, and the value should be earliest/latest/none. This method 
> can only let the job comsume the beginning or the most recent data, but can 
> not specify the specific offset of Kafka began to consume. 
> So, there should be a configuration item (such as 
> "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that 
> allows user to configure the initial offset of Kafka. The action of 
> "flink.source.start.time" is as follows:
> 1) job start from checkpoint / savepoint
>   a> offset of partition can be restored from checkpoint/savepoint,  
> "flink.source.start.time" will be ignored.
>   b> there's no checkpoint/savepoint for the partition (For example, this 
> partition is newly increased), the "flink.kafka.start.time" will be used to 
> initialize the offset of the partition
> 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used 
> to initialize the offset of the kafka
>   a> the "flink.source.start.time" is valid, use it to set the offset of kafka
>   b> the "flink.source.start.time" is out-of-range, the same as it does 
> currently with no initial offset, get kafka's current offset and start reading



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset

2018-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16365126#comment-16365126
 ] 

ASF GitHub Bot commented on FLINK-6352:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5282#discussion_r168377173
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
 ---
@@ -343,6 +348,39 @@ public FlinkKafkaConsumerBase(
 */
public FlinkKafkaConsumerBase setStartFromLatest() {
this.startupMode = StartupMode.LATEST;
+   this.startupOffsetsTimestamp = null;
+   this.specificStartupOffsets = null;
+   return this;
+   }
+
+   /**
+* Specifies the consumer to start reading partitions from a specified 
timestamp.
+* The specified timestamp must be before the current timestamp.
+* This lets the consumer ignore any committed group offsets in 
Zookeeper / Kafka brokers.
+*
+* The consumer will look up the earliest offset whose timestamp is 
greater than or equal
+* to the specific timestamp from Kafka. If there's no such offset, the 
consumer will use the
+* latest offset to read data from kafka.
+*
+* This method does not effect where partitions are read from when 
the consumer is restored
+* from a checkpoint or savepoint. When the consumer is restored from a 
checkpoint or
+* savepoint, only the offsets in the restored state will be used.
+*
+* @return The consumer object, to allow function chaining.
+*/
+   // NOTE -
+   // This method is implemented in the base class because this is where 
the startup logging and verifications live.
+   // However, it is not publicly exposed since only newer Kafka versions 
support the functionality.
+   // Version-specific subclasses which can expose the functionality 
should override and allow public access.
+   protected FlinkKafkaConsumerBase setStartFromTimestamp(long 
startupOffsetsTimestamp) {
+   checkNotNull(startupOffsetsTimestamp, 
"startupOffsetsTimestamp");
--- End diff --

I'll change to a more meaningful message.


> FlinkKafkaConsumer should support to use timestamp to set up start offset
> -
>
> Key: FLINK-6352
> URL: https://issues.apache.org/jira/browse/FLINK-6352
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Fang Yong
>Assignee: Fang Yong
>Priority: Blocker
> Fix For: 1.5.0
>
>
> Currently "auto.offset.reset" is used to initialize the start offset of 
> FlinkKafkaConsumer, and the value should be earliest/latest/none. This method 
> can only let the job comsume the beginning or the most recent data, but can 
> not specify the specific offset of Kafka began to consume. 
> So, there should be a configuration item (such as 
> "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that 
> allows user to configure the initial offset of Kafka. The action of 
> "flink.source.start.time" is as follows:
> 1) job start from checkpoint / savepoint
>   a> offset of partition can be restored from checkpoint/savepoint,  
> "flink.source.start.time" will be ignored.
>   b> there's no checkpoint/savepoint for the partition (For example, this 
> partition is newly increased), the "flink.kafka.start.time" will be used to 
> initialize the offset of the partition
> 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used 
> to initialize the offset of the kafka
>   a> the "flink.source.start.time" is valid, use it to set the offset of kafka
>   b> the "flink.source.start.time" is out-of-range, the same as it does 
> currently with no initial offset, get kafka's current offset and start reading



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset

2018-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16365127#comment-16365127
 ] 

ASF GitHub Bot commented on FLINK-6352:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5282#discussion_r168377183
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
 ---
@@ -343,6 +348,39 @@ public FlinkKafkaConsumerBase(
 */
public FlinkKafkaConsumerBase setStartFromLatest() {
this.startupMode = StartupMode.LATEST;
+   this.startupOffsetsTimestamp = null;
+   this.specificStartupOffsets = null;
+   return this;
+   }
+
+   /**
+* Specifies the consumer to start reading partitions from a specified 
timestamp.
+* The specified timestamp must be before the current timestamp.
+* This lets the consumer ignore any committed group offsets in 
Zookeeper / Kafka brokers.
+*
+* The consumer will look up the earliest offset whose timestamp is 
greater than or equal
+* to the specific timestamp from Kafka. If there's no such offset, the 
consumer will use the
+* latest offset to read data from kafka.
+*
+* This method does not effect where partitions are read from when 
the consumer is restored
+* from a checkpoint or savepoint. When the consumer is restored from a 
checkpoint or
+* savepoint, only the offsets in the restored state will be used.
+*
+* @return The consumer object, to allow function chaining.
+*/
+   // NOTE -
+   // This method is implemented in the base class because this is where 
the startup logging and verifications live.
+   // However, it is not publicly exposed since only newer Kafka versions 
support the functionality.
+   // Version-specific subclasses which can expose the functionality 
should override and allow public access.
+   protected FlinkKafkaConsumerBase setStartFromTimestamp(long 
startupOffsetsTimestamp) {
+   checkNotNull(startupOffsetsTimestamp, 
"startupOffsetsTimestamp");
+
+   long currentTimestamp = System.currentTimeMillis();
+   checkArgument(startupOffsetsTimestamp <= currentTimestamp,
+   "Startup time[" + startupOffsetsTimestamp + "] must be 
before current time[" + currentTimestamp + "].");
--- End diff --

 


> FlinkKafkaConsumer should support to use timestamp to set up start offset
> -
>
> Key: FLINK-6352
> URL: https://issues.apache.org/jira/browse/FLINK-6352
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Fang Yong
>Assignee: Fang Yong
>Priority: Blocker
> Fix For: 1.5.0
>
>
> Currently "auto.offset.reset" is used to initialize the start offset of 
> FlinkKafkaConsumer, and the value should be earliest/latest/none. This method 
> can only let the job comsume the beginning or the most recent data, but can 
> not specify the specific offset of Kafka began to consume. 
> So, there should be a configuration item (such as 
> "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that 
> allows user to configure the initial offset of Kafka. The action of 
> "flink.source.start.time" is as follows:
> 1) job start from checkpoint / savepoint
>   a> offset of partition can be restored from checkpoint/savepoint,  
> "flink.source.start.time" will be ignored.
>   b> there's no checkpoint/savepoint for the partition (For example, this 
> partition is newly increased), the "flink.kafka.start.time" will be used to 
> initialize the offset of the partition
> 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used 
> to initialize the offset of the kafka
>   a> the "flink.source.start.time" is valid, use it to set the offset of kafka
>   b> the "flink.source.start.time" is out-of-range, the same as it does 
> currently with no initial offset, get kafka's current offset and start reading



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset

2018-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16364023#comment-16364023
 ] 

ASF GitHub Bot commented on FLINK-6352:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5282#discussion_r167941419
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
 ---
@@ -343,6 +348,39 @@ public FlinkKafkaConsumerBase(
 */
public FlinkKafkaConsumerBase setStartFromLatest() {
this.startupMode = StartupMode.LATEST;
+   this.startupOffsetsTimestamp = null;
+   this.specificStartupOffsets = null;
+   return this;
+   }
+
+   /**
+* Specifies the consumer to start reading partitions from a specified 
timestamp.
+* The specified timestamp must be before the current timestamp.
+* This lets the consumer ignore any committed group offsets in 
Zookeeper / Kafka brokers.
+*
+* The consumer will look up the earliest offset whose timestamp is 
greater than or equal
+* to the specific timestamp from Kafka. If there's no such offset, the 
consumer will use the
+* latest offset to read data from kafka.
+*
+* This method does not effect where partitions are read from when 
the consumer is restored
+* from a checkpoint or savepoint. When the consumer is restored from a 
checkpoint or
+* savepoint, only the offsets in the restored state will be used.
+*
+* @return The consumer object, to allow function chaining.
+*/
+   // NOTE -
+   // This method is implemented in the base class because this is where 
the startup logging and verifications live.
+   // However, it is not publicly exposed since only newer Kafka versions 
support the functionality.
+   // Version-specific subclasses which can expose the functionality 
should override and allow public access.
+   protected FlinkKafkaConsumerBase setStartFromTimestamp(long 
startupOffsetsTimestamp) {
+   checkNotNull(startupOffsetsTimestamp, 
"startupOffsetsTimestamp");
--- End diff --

I think the error message might not be helpful.


> FlinkKafkaConsumer should support to use timestamp to set up start offset
> -
>
> Key: FLINK-6352
> URL: https://issues.apache.org/jira/browse/FLINK-6352
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Fang Yong
>Assignee: Fang Yong
>Priority: Blocker
> Fix For: 1.5.0
>
>
> Currently "auto.offset.reset" is used to initialize the start offset of 
> FlinkKafkaConsumer, and the value should be earliest/latest/none. This method 
> can only let the job comsume the beginning or the most recent data, but can 
> not specify the specific offset of Kafka began to consume. 
> So, there should be a configuration item (such as 
> "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that 
> allows user to configure the initial offset of Kafka. The action of 
> "flink.source.start.time" is as follows:
> 1) job start from checkpoint / savepoint
>   a> offset of partition can be restored from checkpoint/savepoint,  
> "flink.source.start.time" will be ignored.
>   b> there's no checkpoint/savepoint for the partition (For example, this 
> partition is newly increased), the "flink.kafka.start.time" will be used to 
> initialize the offset of the partition
> 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used 
> to initialize the offset of the kafka
>   a> the "flink.source.start.time" is valid, use it to set the offset of kafka
>   b> the "flink.source.start.time" is out-of-range, the same as it does 
> currently with no initial offset, get kafka's current offset and start reading



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset

2018-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16364024#comment-16364024
 ] 

ASF GitHub Bot commented on FLINK-6352:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5282#discussion_r168169010
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
 ---
@@ -343,6 +348,39 @@ public FlinkKafkaConsumerBase(
 */
public FlinkKafkaConsumerBase setStartFromLatest() {
this.startupMode = StartupMode.LATEST;
+   this.startupOffsetsTimestamp = null;
+   this.specificStartupOffsets = null;
+   return this;
+   }
+
+   /**
+* Specifies the consumer to start reading partitions from a specified 
timestamp.
+* The specified timestamp must be before the current timestamp.
+* This lets the consumer ignore any committed group offsets in 
Zookeeper / Kafka brokers.
+*
+* The consumer will look up the earliest offset whose timestamp is 
greater than or equal
+* to the specific timestamp from Kafka. If there's no such offset, the 
consumer will use the
+* latest offset to read data from kafka.
+*
+* This method does not effect where partitions are read from when 
the consumer is restored
+* from a checkpoint or savepoint. When the consumer is restored from a 
checkpoint or
+* savepoint, only the offsets in the restored state will be used.
+*
+* @return The consumer object, to allow function chaining.
+*/
+   // NOTE -
+   // This method is implemented in the base class because this is where 
the startup logging and verifications live.
+   // However, it is not publicly exposed since only newer Kafka versions 
support the functionality.
+   // Version-specific subclasses which can expose the functionality 
should override and allow public access.
+   protected FlinkKafkaConsumerBase setStartFromTimestamp(long 
startupOffsetsTimestamp) {
+   checkNotNull(startupOffsetsTimestamp, 
"startupOffsetsTimestamp");
+
+   long currentTimestamp = System.currentTimeMillis();
+   checkArgument(startupOffsetsTimestamp <= currentTimestamp,
+   "Startup time[" + startupOffsetsTimestamp + "] must be 
before current time[" + currentTimestamp + "].");
--- End diff --

This should use `"%s"` for string interpolation instead of doing string 
concatenation.


> FlinkKafkaConsumer should support to use timestamp to set up start offset
> -
>
> Key: FLINK-6352
> URL: https://issues.apache.org/jira/browse/FLINK-6352
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Fang Yong
>Assignee: Fang Yong
>Priority: Blocker
> Fix For: 1.5.0
>
>
> Currently "auto.offset.reset" is used to initialize the start offset of 
> FlinkKafkaConsumer, and the value should be earliest/latest/none. This method 
> can only let the job comsume the beginning or the most recent data, but can 
> not specify the specific offset of Kafka began to consume. 
> So, there should be a configuration item (such as 
> "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that 
> allows user to configure the initial offset of Kafka. The action of 
> "flink.source.start.time" is as follows:
> 1) job start from checkpoint / savepoint
>   a> offset of partition can be restored from checkpoint/savepoint,  
> "flink.source.start.time" will be ignored.
>   b> there's no checkpoint/savepoint for the partition (For example, this 
> partition is newly increased), the "flink.kafka.start.time" will be used to 
> initialize the offset of the partition
> 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used 
> to initialize the offset of the kafka
>   a> the "flink.source.start.time" is valid, use it to set the offset of kafka
>   b> the "flink.source.start.time" is out-of-range, the same as it does 
> currently with no initial offset, get kafka's current offset and start reading



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset

2018-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16364026#comment-16364026
 ] 

ASF GitHub Bot commented on FLINK-6352:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5282#discussion_r168171479
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
 ---
@@ -1910,86 +1959,171 @@ public void cancel() {
 

JobManagerCommunicationUtils.waitUntilNoJobIsRunning(flink.getLeaderGateway(timeout));
 
-   final StreamExecutionEnvironment readEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
-   
readEnv.getConfig().setRestartStrategy(RestartStrategies.noRestart());
-   readEnv.getConfig().disableSysoutLogging();
-   readEnv.setParallelism(parallelism);
+   if (validateSequence(topicName, parallelism, 
deserSchema, numElements)) {
+   // everything is good!
+   return topicName;
+   }
+   else {
+   deleteTestTopic(topicName);
+   // fall through the loop
+   }
+   }
 
-   Properties readProps = (Properties) 
standardProps.clone();
-   readProps.setProperty("group.id", 
"flink-tests-validator");
-   readProps.putAll(secureProps);
-   FlinkKafkaConsumerBase> 
consumer = kafkaServer.getConsumer(topicName, deserSchema, readProps);
+   throw new Exception("Could not write a valid sequence to Kafka 
after " + maxNumAttempts + " attempts");
+   }
 
-   readEnv
-   .addSource(consumer)
-   .map(new 
RichMapFunction, Tuple2>() {
+   protected void writeAppendSequence(
+   String topicName,
+   final int originalNumElements,
+   final int numElementsToAppend,
+   final int parallelism) throws Exception {
 
-   private final int totalCount = 
parallelism * numElements;
-   private int count = 0;
+   LOG.info("\n===\n" +
+   "== Appending sequence of " + numElementsToAppend + " 
into " + topicName +
+   "===");
 
-   @Override
-   public Tuple2 
map(Tuple2 value) throws Exception {
-   if (++count == 
totalCount) {
-   throw new 
SuccessException();
-   } else {
-   return value;
-   }
-   }
-   }).setParallelism(1)
-   .addSink(new 
DiscardingSink>()).setParallelism(1);
+   final TypeInformation> resultType =
+   TypeInformation.of(new TypeHint>() {});
 
-   final AtomicReference errorRef = new 
AtomicReference<>();
+   final KeyedSerializationSchema> 
serSchema =
+   new KeyedSerializationSchemaWrapper<>(
+   new 
TypeInformationSerializationSchema<>(resultType, new ExecutionConfig()));
 
-   Thread runner = new Thread() {
-   @Override
-   public void run() {
-   try {
-   tryExecute(readEnv, "sequence 
validation");
-   } catch (Throwable t) {
-   errorRef.set(t);
-   }
+   final KeyedDeserializationSchema> 
deserSchema =
+   new KeyedDeserializationSchemaWrapper<>(
+   new 
TypeInformationSerializationSchema<>(resultType, new ExecutionConfig()));
+
+   //  Write the append sequence 

[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset

2018-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16364022#comment-16364022
 ] 

ASF GitHub Bot commented on FLINK-6352:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5282#discussion_r167941085
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
 ---
@@ -343,6 +348,39 @@ public FlinkKafkaConsumerBase(
 */
public FlinkKafkaConsumerBase setStartFromLatest() {
this.startupMode = StartupMode.LATEST;
+   this.startupOffsetsTimestamp = null;
+   this.specificStartupOffsets = null;
+   return this;
+   }
+
+   /**
+* Specifies the consumer to start reading partitions from a specified 
timestamp.
+* The specified timestamp must be before the current timestamp.
+* This lets the consumer ignore any committed group offsets in 
Zookeeper / Kafka brokers.
+*
+* The consumer will look up the earliest offset whose timestamp is 
greater than or equal
+* to the specific timestamp from Kafka. If there's no such offset, the 
consumer will use the
+* latest offset to read data from kafka.
+*
+* This method does not effect where partitions are read from when 
the consumer is restored
--- End diff --

typo: effect -> affect


> FlinkKafkaConsumer should support to use timestamp to set up start offset
> -
>
> Key: FLINK-6352
> URL: https://issues.apache.org/jira/browse/FLINK-6352
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Fang Yong
>Assignee: Fang Yong
>Priority: Blocker
> Fix For: 1.5.0
>
>
> Currently "auto.offset.reset" is used to initialize the start offset of 
> FlinkKafkaConsumer, and the value should be earliest/latest/none. This method 
> can only let the job comsume the beginning or the most recent data, but can 
> not specify the specific offset of Kafka began to consume. 
> So, there should be a configuration item (such as 
> "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that 
> allows user to configure the initial offset of Kafka. The action of 
> "flink.source.start.time" is as follows:
> 1) job start from checkpoint / savepoint
>   a> offset of partition can be restored from checkpoint/savepoint,  
> "flink.source.start.time" will be ignored.
>   b> there's no checkpoint/savepoint for the partition (For example, this 
> partition is newly increased), the "flink.kafka.start.time" will be used to 
> initialize the offset of the partition
> 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used 
> to initialize the offset of the kafka
>   a> the "flink.source.start.time" is valid, use it to set the offset of kafka
>   b> the "flink.source.start.time" is out-of-range, the same as it does 
> currently with no initial offset, get kafka's current offset and start reading



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset

2018-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16364025#comment-16364025
 ] 

ASF GitHub Bot commented on FLINK-6352:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5282#discussion_r168169777
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
 ---
@@ -441,28 +481,57 @@ public void open(Configuration configuration) throws 
Exception {
getRuntimeContext().getIndexOfThisSubtask(), 
subscribedPartitionsToStartOffsets.size(), subscribedPartitionsToStartOffsets);
} else {
// use the partition discoverer to fetch the initial 
seed partitions,
-   // and set their initial offsets depending on the 
startup mode
-   for (KafkaTopicPartition seedPartition : allPartitions) 
{
-   if (startupMode != 
StartupMode.SPECIFIC_OFFSETS) {
-   
subscribedPartitionsToStartOffsets.put(seedPartition, 
startupMode.getStateSentinel());
-   } else {
+   // and set their initial offsets depending on the 
startup mode.
+   // for SPECIFIC_OFFSETS and TIMESTAMP modes, we set the 
specific offsets now;
+   // for other modes (EARLIEST, LATEST, and 
GROUP_OFFSETS), the offset is lazily determined
+   // when the partition is actually read.
+   switch (startupMode) {
+   case SPECIFIC_OFFSETS:
if (specificStartupOffsets == null) {
throw new 
IllegalArgumentException(
"Startup mode for the 
consumer set to " + StartupMode.SPECIFIC_OFFSETS +
-   ", but no 
specific offsets were specified");
+   ", but no 
specific offsets were specified.");
}
 
-   Long specificOffset = 
specificStartupOffsets.get(seedPartition);
-   if (specificOffset != null) {
-   // since the specified offsets 
represent the next record to read, we subtract
-   // it by one so that the 
initial state of the consumer will be correct
-   
subscribedPartitionsToStartOffsets.put(seedPartition, specificOffset - 1);
-   } else {
-   // default to group offset 
behaviour if the user-provided specific offsets
-   // do not contain a value for 
this partition
-   
subscribedPartitionsToStartOffsets.put(seedPartition, 
KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
+   for (KafkaTopicPartition seedPartition 
: allPartitions) {
+   Long specificOffset = 
specificStartupOffsets.get(seedPartition);
+   if (specificOffset != null) {
+   // since the specified 
offsets represent the next record to read, we subtract
+   // it by one so that 
the initial state of the consumer will be correct
+   
subscribedPartitionsToStartOffsets.put(seedPartition, specificOffset - 1);
+   } else {
+   // default to group 
offset behaviour if the user-provided specific offsets
+   // do not contain a 
value for this partition
+   
subscribedPartitionsToStartOffsets.put(seedPartition, 
KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
+   }
+   }
+
+   break;
+   case TIMESTAMP:
+   if (startupOffsetsTimestamp == null) {
+   throw new 
IllegalArgumentException(
--- End diff --

Maybe this should be an `IllegalStateException`. The existing code also 
uses `IllegalArgumentException` but were quite a bit removed from the actual 
point 

[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset

2018-02-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16363556#comment-16363556
 ] 

ASF GitHub Bot commented on FLINK-6352:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5282
  
@aljoscha this is the currently the case for any startup mode. Any 
partition discovered after the initial batch fetch is considered a new 
partition due to Kafka scale outs, and is therefore read from the record 
horizon. Startup modes apply only to existing partitions.


> FlinkKafkaConsumer should support to use timestamp to set up start offset
> -
>
> Key: FLINK-6352
> URL: https://issues.apache.org/jira/browse/FLINK-6352
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Fang Yong
>Assignee: Fang Yong
>Priority: Blocker
> Fix For: 1.5.0
>
>
> Currently "auto.offset.reset" is used to initialize the start offset of 
> FlinkKafkaConsumer, and the value should be earliest/latest/none. This method 
> can only let the job comsume the beginning or the most recent data, but can 
> not specify the specific offset of Kafka began to consume. 
> So, there should be a configuration item (such as 
> "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that 
> allows user to configure the initial offset of Kafka. The action of 
> "flink.source.start.time" is as follows:
> 1) job start from checkpoint / savepoint
>   a> offset of partition can be restored from checkpoint/savepoint,  
> "flink.source.start.time" will be ignored.
>   b> there's no checkpoint/savepoint for the partition (For example, this 
> partition is newly increased), the "flink.kafka.start.time" will be used to 
> initialize the offset of the partition
> 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used 
> to initialize the offset of the kafka
>   a> the "flink.source.start.time" is valid, use it to set the offset of kafka
>   b> the "flink.source.start.time" is out-of-range, the same as it does 
> currently with no initial offset, get kafka's current offset and start reading



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset

2018-02-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16362697#comment-16362697
 ] 

ASF GitHub Bot commented on FLINK-6352:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/5282
  
Initial question: why is the timestamp not used for newly discovered 
partitions?


> FlinkKafkaConsumer should support to use timestamp to set up start offset
> -
>
> Key: FLINK-6352
> URL: https://issues.apache.org/jira/browse/FLINK-6352
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Fang Yong
>Assignee: Fang Yong
>Priority: Blocker
> Fix For: 1.5.0
>
>
> Currently "auto.offset.reset" is used to initialize the start offset of 
> FlinkKafkaConsumer, and the value should be earliest/latest/none. This method 
> can only let the job comsume the beginning or the most recent data, but can 
> not specify the specific offset of Kafka began to consume. 
> So, there should be a configuration item (such as 
> "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that 
> allows user to configure the initial offset of Kafka. The action of 
> "flink.source.start.time" is as follows:
> 1) job start from checkpoint / savepoint
>   a> offset of partition can be restored from checkpoint/savepoint,  
> "flink.source.start.time" will be ignored.
>   b> there's no checkpoint/savepoint for the partition (For example, this 
> partition is newly increased), the "flink.kafka.start.time" will be used to 
> initialize the offset of the partition
> 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used 
> to initialize the offset of the kafka
>   a> the "flink.source.start.time" is valid, use it to set the offset of kafka
>   b> the "flink.source.start.time" is out-of-range, the same as it does 
> currently with no initial offset, get kafka's current offset and start reading



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset

2018-01-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16322119#comment-16322119
 ] 

ASF GitHub Bot commented on FLINK-6352:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/3915
  
Hi @zjureel, I went ahead to address my own review / concerns with the 
change in another PR that is based on your current work: #5282. I hope that is 
okay, and would be great if you would like to review that.

The main changes are:
- We eagerly determine the timestamp offsets. `LATEST`, `EARLIEST`, 
`GROUP_OFFSETS` startup modes still determines the offsets lazily, while 
`TIMESTAMP` and `SPECIFIC_OFFSETS` will have actual offsets already before they 
handled by the `KafkaConsumerThread`. It dawned on me that actually there is no 
reason to lazily determine the offset for timestamp-based startup, since the 
actual offset in the end in this case does not vary depending on when we fetch 
the startup offsets.
- Don't use `Date` to define timestamp, just use Longs. The Kafka APIs 
actually take long value timestamps, so I figured it would make sense that we 
follow. 


> FlinkKafkaConsumer should support to use timestamp to set up start offset
> -
>
> Key: FLINK-6352
> URL: https://issues.apache.org/jira/browse/FLINK-6352
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Fang Yong
>Assignee: Fang Yong
>Priority: Blocker
> Fix For: 1.5.0
>
>
> Currently "auto.offset.reset" is used to initialize the start offset of 
> FlinkKafkaConsumer, and the value should be earliest/latest/none. This method 
> can only let the job comsume the beginning or the most recent data, but can 
> not specify the specific offset of Kafka began to consume. 
> So, there should be a configuration item (such as 
> "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that 
> allows user to configure the initial offset of Kafka. The action of 
> "flink.source.start.time" is as follows:
> 1) job start from checkpoint / savepoint
>   a> offset of partition can be restored from checkpoint/savepoint,  
> "flink.source.start.time" will be ignored.
>   b> there's no checkpoint/savepoint for the partition (For example, this 
> partition is newly increased), the "flink.kafka.start.time" will be used to 
> initialize the offset of the partition
> 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used 
> to initialize the offset of the kafka
>   a> the "flink.source.start.time" is valid, use it to set the offset of kafka
>   b> the "flink.source.start.time" is out-of-range, the same as it does 
> currently with no initial offset, get kafka's current offset and start reading



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset

2018-01-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16322109#comment-16322109
 ] 

ASF GitHub Bot commented on FLINK-6352:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5282
  
cc @zjureel 


> FlinkKafkaConsumer should support to use timestamp to set up start offset
> -
>
> Key: FLINK-6352
> URL: https://issues.apache.org/jira/browse/FLINK-6352
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Fang Yong
>Assignee: Fang Yong
>Priority: Blocker
> Fix For: 1.5.0
>
>
> Currently "auto.offset.reset" is used to initialize the start offset of 
> FlinkKafkaConsumer, and the value should be earliest/latest/none. This method 
> can only let the job comsume the beginning or the most recent data, but can 
> not specify the specific offset of Kafka began to consume. 
> So, there should be a configuration item (such as 
> "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that 
> allows user to configure the initial offset of Kafka. The action of 
> "flink.source.start.time" is as follows:
> 1) job start from checkpoint / savepoint
>   a> offset of partition can be restored from checkpoint/savepoint,  
> "flink.source.start.time" will be ignored.
>   b> there's no checkpoint/savepoint for the partition (For example, this 
> partition is newly increased), the "flink.kafka.start.time" will be used to 
> initialize the offset of the partition
> 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used 
> to initialize the offset of the kafka
>   a> the "flink.source.start.time" is valid, use it to set the offset of kafka
>   b> the "flink.source.start.time" is out-of-range, the same as it does 
> currently with no initial offset, get kafka's current offset and start reading



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset

2018-01-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16322108#comment-16322108
 ] 

ASF GitHub Bot commented on FLINK-6352:
---

GitHub user tzulitai opened a pull request:

https://github.com/apache/flink/pull/5282

[FLINK-6352] [kafka] Timestamp-based offset configuration for 
FlinkKafkaConsumer

## What is the purpose of the change

This PR is based on @zjureel's initial efforts on the feature in #3915.

This version mainly differs in that:
- When using timestamps to define the offset, the actual offset is eagerly 
determined in the `FlinkKafkaConsumerBase` class.
- The `setStartFromTimestamp` configuration method is defined in the 
`FlinkKafkaConsumerBase` class, with `protected` access. Kafka versions which 
support the functionality should override the method with `public` access.
- Timestamp is configured simply as a long value, and not a Java `Date`.

**Overall, the usage of the feature is as follows:**
```
FlinkKafkaConsumer011 consumer = new FlinkKafkaConsumer011<>(...);
consumer.setStartFromTimestamp(1515671654453L);

DataStream stream = env.addSource(consumer);
...
```

Only versions 0.10 and 0.11 supports this feature.

**Semantics:**
- The provided timestamp cannot be larger than the current timestamp.
- For a partition, the earliest record which `record timestamp >= provided 
timestamp` is used as the starting offset.
- If the provided timestamp is larger than the latest record in a 
partition, that partition will simply be read from the head.
- For all new partitions that are discovered after the initial startup (due 
to scaling out Kafka), they are all read from the earliest possible record and 
the provided timestamp is not used.

## Brief change log

- d012826 @zjureel's initial efforts on the feature.
- 7ac07e8 Instead of lazily determining exact offsets for timestamp-based 
startup, the offsets are determined eagerly in `FlinkKafkaConsumerBase`. This 
commit also refactors the `setStartFromTimestamp` method to live in the base 
class.
- 32d46ef Change to just use long values to define timestamps, instead of 
using Java `Date`
- 7bb44a8 General improvement for the `runStartFromTimestamp` integration 
test.

## Verifying this change

New integration tests `Kafka010ITCase::testStartFromTimestamp` and 
`Kafka011ITCase::testStartFromTimestamp` verifies this new feature.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (**yes** / no)
  - If yes, how is the feature documented? (not applicable / **docs** / 
**JavaDocs** / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tzulitai/flink FLINK-6352

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5282.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5282


commit d012826480b7eee2641da3b260b127bf8efaf790
Author: zjureel 
Date:   2017-12-21T09:49:11Z

[FLINK-6352] [kafka] Support to set offset of Kafka with specific date

commit 7ac07e8824ec42aeef6ee6b1d00650acf8ae06bb
Author: Tzu-Li (Gordon) Tai 
Date:   2018-01-11T06:26:37Z

[FLINK-6352] [kafka] Eagerly determine startup offsets when startup mode is 
TIMESTAMP

commit 32d46ef2b98b282ca12e170702161bc123bc1f56
Author: Tzu-Li (Gordon) Tai 
Date:   2018-01-11T09:33:49Z

[FLINK-6352] [kafka] Remove usage of java Date to specify startup timestamp

commit 7bb44a8d510612bff4b5137ff54f023ed556489a
Author: Tzu-Li (Gordon) Tai 
Date:   2018-01-11T10:33:21Z

[FLINK-6352] [kafka, tests] Make runStartFromTimestamp more flexible




> FlinkKafkaConsumer should support to use timestamp to set up start offset
> -
>
> Key: FLINK-6352
> URL: https://issues.apache.org/jira/browse/FLINK-6352
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka 

[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset

2018-01-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16320254#comment-16320254
 ] 

ASF GitHub Bot commented on FLINK-6352:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3915#discussion_r160677582
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge010.java
 ---
@@ -48,4 +59,18 @@ public void seekPartitionToBeginning(KafkaConsumer 
consumer, TopicPartitio
public void seekPartitionToEnd(KafkaConsumer consumer, 
TopicPartition partition) {
consumer.seekToEnd(Collections.singletonList(partition));
}
+
+   @Override
+   public void seekPartitionToDate(KafkaConsumer consumer, 
TopicPartition partition) {
--- End diff --

But from here I can understand why.

Ideally, this method signature should really be 
`seekPartitionToDate(KafkaConsumer, TopicParitition, Date)`, but that would 
require the startup date to be passed all the way to the `KafkaConsumerThread`.
This also leads to the fact, which isn't nice, that the 
`KafkaConsumerThread` lives within the Kafka 0.9 module, while 0.9 doesn't 
support timestamp-based offsets ...


> FlinkKafkaConsumer should support to use timestamp to set up start offset
> -
>
> Key: FLINK-6352
> URL: https://issues.apache.org/jira/browse/FLINK-6352
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Fang Yong
>Assignee: Fang Yong
>Priority: Blocker
> Fix For: 1.5.0
>
>
> Currently "auto.offset.reset" is used to initialize the start offset of 
> FlinkKafkaConsumer, and the value should be earliest/latest/none. This method 
> can only let the job comsume the beginning or the most recent data, but can 
> not specify the specific offset of Kafka began to consume. 
> So, there should be a configuration item (such as 
> "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that 
> allows user to configure the initial offset of Kafka. The action of 
> "flink.source.start.time" is as follows:
> 1) job start from checkpoint / savepoint
>   a> offset of partition can be restored from checkpoint/savepoint,  
> "flink.source.start.time" will be ignored.
>   b> there's no checkpoint/savepoint for the partition (For example, this 
> partition is newly increased), the "flink.kafka.start.time" will be used to 
> initialize the offset of the partition
> 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used 
> to initialize the offset of the kafka
>   a> the "flink.source.start.time" is valid, use it to set the offset of kafka
>   b> the "flink.source.start.time" is out-of-range, the same as it does 
> currently with no initial offset, get kafka's current offset and start reading



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset

2018-01-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16320255#comment-16320255
 ] 

ASF GitHub Bot commented on FLINK-6352:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3915#discussion_r160676392
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge010.java
 ---
@@ -34,6 +38,13 @@
  */
 public class KafkaConsumerCallBridge010 extends KafkaConsumerCallBridge {
 
+   private Date startupDate;
--- End diff --

Passing in the startup date to the API call bridge constructor seems to be 
very confusing ...


> FlinkKafkaConsumer should support to use timestamp to set up start offset
> -
>
> Key: FLINK-6352
> URL: https://issues.apache.org/jira/browse/FLINK-6352
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Fang Yong
>Assignee: Fang Yong
>Priority: Blocker
> Fix For: 1.5.0
>
>
> Currently "auto.offset.reset" is used to initialize the start offset of 
> FlinkKafkaConsumer, and the value should be earliest/latest/none. This method 
> can only let the job comsume the beginning or the most recent data, but can 
> not specify the specific offset of Kafka began to consume. 
> So, there should be a configuration item (such as 
> "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that 
> allows user to configure the initial offset of Kafka. The action of 
> "flink.source.start.time" is as follows:
> 1) job start from checkpoint / savepoint
>   a> offset of partition can be restored from checkpoint/savepoint,  
> "flink.source.start.time" will be ignored.
>   b> there's no checkpoint/savepoint for the partition (For example, this 
> partition is newly increased), the "flink.kafka.start.time" will be used to 
> initialize the offset of the partition
> 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used 
> to initialize the offset of the kafka
>   a> the "flink.source.start.time" is valid, use it to set the offset of kafka
>   b> the "flink.source.start.time" is out-of-range, the same as it does 
> currently with no initial offset, get kafka's current offset and start reading



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset

2017-12-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16299782#comment-16299782
 ] 

ASF GitHub Bot commented on FLINK-6352:
---

Github user zjureel commented on the issue:

https://github.com/apache/flink/pull/3915
  
@tzulitai I notice that there's some update of this issue, so rebase master 
to this PR, could you please take a look when you're free, thanks


> FlinkKafkaConsumer should support to use timestamp to set up start offset
> -
>
> Key: FLINK-6352
> URL: https://issues.apache.org/jira/browse/FLINK-6352
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Fang Yong
>Assignee: Fang Yong
>Priority: Blocker
> Fix For: 1.5.0
>
>
> Currently "auto.offset.reset" is used to initialize the start offset of 
> FlinkKafkaConsumer, and the value should be earliest/latest/none. This method 
> can only let the job comsume the beginning or the most recent data, but can 
> not specify the specific offset of Kafka began to consume. 
> So, there should be a configuration item (such as 
> "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that 
> allows user to configure the initial offset of Kafka. The action of 
> "flink.source.start.time" is as follows:
> 1) job start from checkpoint / savepoint
>   a> offset of partition can be restored from checkpoint/savepoint,  
> "flink.source.start.time" will be ignored.
>   b> there's no checkpoint/savepoint for the partition (For example, this 
> partition is newly increased), the "flink.kafka.start.time" will be used to 
> initialize the offset of the partition
> 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used 
> to initialize the offset of the kafka
>   a> the "flink.source.start.time" is valid, use it to set the offset of kafka
>   b> the "flink.source.start.time" is out-of-range, the same as it does 
> currently with no initial offset, get kafka's current offset and start reading



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset

2017-12-11 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16286936#comment-16286936
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-6352:


We seem to have a user asking for this feature:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Kafka-connector-partition-offsets-for-a-given-timestamp-td17187.html.

Elevating this issue to BLOCKER for 1.5.0.

> FlinkKafkaConsumer should support to use timestamp to set up start offset
> -
>
> Key: FLINK-6352
> URL: https://issues.apache.org/jira/browse/FLINK-6352
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Fang Yong
>Assignee: Fang Yong
> Fix For: 1.5.0
>
>
> Currently "auto.offset.reset" is used to initialize the start offset of 
> FlinkKafkaConsumer, and the value should be earliest/latest/none. This method 
> can only let the job comsume the beginning or the most recent data, but can 
> not specify the specific offset of Kafka began to consume. 
> So, there should be a configuration item (such as 
> "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that 
> allows user to configure the initial offset of Kafka. The action of 
> "flink.source.start.time" is as follows:
> 1) job start from checkpoint / savepoint
>   a> offset of partition can be restored from checkpoint/savepoint,  
> "flink.source.start.time" will be ignored.
>   b> there's no checkpoint/savepoint for the partition (For example, this 
> partition is newly increased), the "flink.kafka.start.time" will be used to 
> initialize the offset of the partition
> 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used 
> to initialize the offset of the kafka
>   a> the "flink.source.start.time" is valid, use it to set the offset of kafka
>   b> the "flink.source.start.time" is out-of-range, the same as it does 
> currently with no initial offset, get kafka's current offset and start reading



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset

2017-07-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16092749#comment-16092749
 ] 

ASF GitHub Bot commented on FLINK-6352:
---

Github user zjureel commented on the issue:

https://github.com/apache/flink/pull/3915
  
@tzulitai No problem, thank you for your attension :)


> FlinkKafkaConsumer should support to use timestamp to set up start offset
> -
>
> Key: FLINK-6352
> URL: https://issues.apache.org/jira/browse/FLINK-6352
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Fang Yong
>Assignee: Fang Yong
> Fix For: 1.4.0
>
>
> Currently "auto.offset.reset" is used to initialize the start offset of 
> FlinkKafkaConsumer, and the value should be earliest/latest/none. This method 
> can only let the job comsume the beginning or the most recent data, but can 
> not specify the specific offset of Kafka began to consume. 
> So, there should be a configuration item (such as 
> "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that 
> allows user to configure the initial offset of Kafka. The action of 
> "flink.source.start.time" is as follows:
> 1) job start from checkpoint / savepoint
>   a> offset of partition can be restored from checkpoint/savepoint,  
> "flink.source.start.time" will be ignored.
>   b> there's no checkpoint/savepoint for the partition (For example, this 
> partition is newly increased), the "flink.kafka.start.time" will be used to 
> initialize the offset of the partition
> 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used 
> to initialize the offset of the kafka
>   a> the "flink.source.start.time" is valid, use it to set the offset of kafka
>   b> the "flink.source.start.time" is out-of-range, the same as it does 
> currently with no initial offset, get kafka's current offset and start reading



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset

2017-07-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16092716#comment-16092716
 ] 

ASF GitHub Bot commented on FLINK-6352:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/3915
  
@zjureel just so you know, I'm currently a bit busy with other critical 
bugs in the Kafka consumer.
Please bear with me a little bit more ..


> FlinkKafkaConsumer should support to use timestamp to set up start offset
> -
>
> Key: FLINK-6352
> URL: https://issues.apache.org/jira/browse/FLINK-6352
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Fang Yong
>Assignee: Fang Yong
> Fix For: 1.4.0
>
>
> Currently "auto.offset.reset" is used to initialize the start offset of 
> FlinkKafkaConsumer, and the value should be earliest/latest/none. This method 
> can only let the job comsume the beginning or the most recent data, but can 
> not specify the specific offset of Kafka began to consume. 
> So, there should be a configuration item (such as 
> "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that 
> allows user to configure the initial offset of Kafka. The action of 
> "flink.source.start.time" is as follows:
> 1) job start from checkpoint / savepoint
>   a> offset of partition can be restored from checkpoint/savepoint,  
> "flink.source.start.time" will be ignored.
>   b> there's no checkpoint/savepoint for the partition (For example, this 
> partition is newly increased), the "flink.kafka.start.time" will be used to 
> initialize the offset of the partition
> 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used 
> to initialize the offset of the kafka
>   a> the "flink.source.start.time" is valid, use it to set the offset of kafka
>   b> the "flink.source.start.time" is out-of-range, the same as it does 
> currently with no initial offset, get kafka's current offset and start reading



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset

2017-06-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16069846#comment-16069846
 ] 

ASF GitHub Bot commented on FLINK-6352:
---

Github user zjureel commented on the issue:

https://github.com/apache/flink/pull/3915
  
@tzulitai Thank you for your suggestion. I think move the conversion 
between date and offset to `KafkaConsumerThread` is really a good idea. I have 
fixed the NPE in test case, and move the conversion of date to offset, please 
have a look when you rre free, thanks


> FlinkKafkaConsumer should support to use timestamp to set up start offset
> -
>
> Key: FLINK-6352
> URL: https://issues.apache.org/jira/browse/FLINK-6352
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Fang Yong
>Assignee: Fang Yong
> Fix For: 1.4.0
>
>
> Currently "auto.offset.reset" is used to initialize the start offset of 
> FlinkKafkaConsumer, and the value should be earliest/latest/none. This method 
> can only let the job comsume the beginning or the most recent data, but can 
> not specify the specific offset of Kafka began to consume. 
> So, there should be a configuration item (such as 
> "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that 
> allows user to configure the initial offset of Kafka. The action of 
> "flink.source.start.time" is as follows:
> 1) job start from checkpoint / savepoint
>   a> offset of partition can be restored from checkpoint/savepoint,  
> "flink.source.start.time" will be ignored.
>   b> there's no checkpoint/savepoint for the partition (For example, this 
> partition is newly increased), the "flink.kafka.start.time" will be used to 
> initialize the offset of the partition
> 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used 
> to initialize the offset of the kafka
>   a> the "flink.source.start.time" is valid, use it to set the offset of kafka
>   b> the "flink.source.start.time" is out-of-range, the same as it does 
> currently with no initial offset, get kafka's current offset and start reading



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset

2017-06-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16069634#comment-16069634
 ] 

ASF GitHub Bot commented on FLINK-6352:
---

Github user zjureel commented on a diff in the pull request:

https://github.com/apache/flink/pull/3915#discussion_r124977924
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
 ---
@@ -128,6 +135,53 @@ public FlinkKafkaConsumer010(List topics, 
KeyedDeserializationSchema
}
 
@Override
+   public FlinkKafkaConsumerBase setStartFromSpecificDate(Date date) {
--- End diff --

In fact we need to override this in 0.10 here. `FlinkKafkaConsumer010` 
extends from `FlinkKafkaConsumer09`, and `Exception` will be thrown in 
`setStartFromSpecificDate`  of  `FlinkKafkaConsumer09`


> FlinkKafkaConsumer should support to use timestamp to set up start offset
> -
>
> Key: FLINK-6352
> URL: https://issues.apache.org/jira/browse/FLINK-6352
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Fang Yong
>Assignee: Fang Yong
> Fix For: 1.4.0
>
>
> Currently "auto.offset.reset" is used to initialize the start offset of 
> FlinkKafkaConsumer, and the value should be earliest/latest/none. This method 
> can only let the job comsume the beginning or the most recent data, but can 
> not specify the specific offset of Kafka began to consume. 
> So, there should be a configuration item (such as 
> "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that 
> allows user to configure the initial offset of Kafka. The action of 
> "flink.source.start.time" is as follows:
> 1) job start from checkpoint / savepoint
>   a> offset of partition can be restored from checkpoint/savepoint,  
> "flink.source.start.time" will be ignored.
>   b> there's no checkpoint/savepoint for the partition (For example, this 
> partition is newly increased), the "flink.kafka.start.time" will be used to 
> initialize the offset of the partition
> 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used 
> to initialize the offset of the kafka
>   a> the "flink.source.start.time" is valid, use it to set the offset of kafka
>   b> the "flink.source.start.time" is out-of-range, the same as it does 
> currently with no initial offset, get kafka's current offset and start reading



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset

2017-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16069507#comment-16069507
 ] 

ASF GitHub Bot commented on FLINK-6352:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3915#discussion_r124965318
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
 ---
@@ -128,6 +135,53 @@ public FlinkKafkaConsumer010(List topics, 
KeyedDeserializationSchema
}
 
@Override
+   public FlinkKafkaConsumerBase setStartFromSpecificDate(Date date) {
--- End diff --

I don't think you need to override this in 0.10, right?
The implementation is basically identical to the base implementation.


> FlinkKafkaConsumer should support to use timestamp to set up start offset
> -
>
> Key: FLINK-6352
> URL: https://issues.apache.org/jira/browse/FLINK-6352
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Fang Yong
>Assignee: Fang Yong
> Fix For: 1.4.0
>
>
> Currently "auto.offset.reset" is used to initialize the start offset of 
> FlinkKafkaConsumer, and the value should be earliest/latest/none. This method 
> can only let the job comsume the beginning or the most recent data, but can 
> not specify the specific offset of Kafka began to consume. 
> So, there should be a configuration item (such as 
> "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that 
> allows user to configure the initial offset of Kafka. The action of 
> "flink.source.start.time" is as follows:
> 1) job start from checkpoint / savepoint
>   a> offset of partition can be restored from checkpoint/savepoint,  
> "flink.source.start.time" will be ignored.
>   b> there's no checkpoint/savepoint for the partition (For example, this 
> partition is newly increased), the "flink.kafka.start.time" will be used to 
> initialize the offset of the partition
> 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used 
> to initialize the offset of the kafka
>   a> the "flink.source.start.time" is valid, use it to set the offset of kafka
>   b> the "flink.source.start.time" is out-of-range, the same as it does 
> currently with no initial offset, get kafka's current offset and start reading



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset

2017-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16069510#comment-16069510
 ] 

ASF GitHub Bot commented on FLINK-6352:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3915#discussion_r124964762
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
 ---
@@ -128,6 +135,53 @@ public FlinkKafkaConsumer010(List topics, 
KeyedDeserializationSchema
}
 
@Override
+   public FlinkKafkaConsumerBase setStartFromSpecificDate(Date date) {
+   Preconditions.checkArgument(null != date && date.getTime() <= 
System.currentTimeMillis(), "Startup time must before curr time.");
+   this.startupMode = StartupMode.SPECIFIC_TIMESTAMP;
+   this.specificStartupDate = date;
+   this.specificStartupOffsets = null;
+   return this;
+   }
+
+   /**
+* Convert flink topic partition to kafka topic partition.
+* @param flinkTopicPartitionMap
+* @return
+*/
+   private Map 
convertFlinkToKafkaTopicPartition(Map 
flinkTopicPartitionMap) {
+   Map topicPartitionMap = new 
HashMap<>(flinkTopicPartitionMap.size());
+   for (Map.Entry entry : 
flinkTopicPartitionMap.entrySet()) {
+   topicPartitionMap.put(new 
TopicPartition(entry.getKey().getTopic(), entry.getKey().getPartition()), 
entry.getValue());
+   }
+
+   return topicPartitionMap;
+
--- End diff --

unnecessary empty line


> FlinkKafkaConsumer should support to use timestamp to set up start offset
> -
>
> Key: FLINK-6352
> URL: https://issues.apache.org/jira/browse/FLINK-6352
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Fang Yong
>Assignee: Fang Yong
> Fix For: 1.4.0
>
>
> Currently "auto.offset.reset" is used to initialize the start offset of 
> FlinkKafkaConsumer, and the value should be earliest/latest/none. This method 
> can only let the job comsume the beginning or the most recent data, but can 
> not specify the specific offset of Kafka began to consume. 
> So, there should be a configuration item (such as 
> "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that 
> allows user to configure the initial offset of Kafka. The action of 
> "flink.source.start.time" is as follows:
> 1) job start from checkpoint / savepoint
>   a> offset of partition can be restored from checkpoint/savepoint,  
> "flink.source.start.time" will be ignored.
>   b> there's no checkpoint/savepoint for the partition (For example, this 
> partition is newly increased), the "flink.kafka.start.time" will be used to 
> initialize the offset of the partition
> 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used 
> to initialize the offset of the kafka
>   a> the "flink.source.start.time" is valid, use it to set the offset of kafka
>   b> the "flink.source.start.time" is out-of-range, the same as it does 
> currently with no initial offset, get kafka's current offset and start reading



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset

2017-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16069512#comment-16069512
 ] 

ASF GitHub Bot commented on FLINK-6352:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3915#discussion_r124966020
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
 ---
@@ -128,6 +135,53 @@ public FlinkKafkaConsumer010(List topics, 
KeyedDeserializationSchema
}
 
@Override
+   public FlinkKafkaConsumerBase setStartFromSpecificDate(Date date) {
+   Preconditions.checkArgument(null != date && date.getTime() <= 
System.currentTimeMillis(), "Startup time must before curr time.");
+   this.startupMode = StartupMode.SPECIFIC_TIMESTAMP;
+   this.specificStartupDate = date;
+   this.specificStartupOffsets = null;
+   return this;
+   }
+
+   /**
+* Convert flink topic partition to kafka topic partition.
+* @param flinkTopicPartitionMap
+* @return
+*/
+   private Map 
convertFlinkToKafkaTopicPartition(Map 
flinkTopicPartitionMap) {
+   Map topicPartitionMap = new 
HashMap<>(flinkTopicPartitionMap.size());
+   for (Map.Entry entry : 
flinkTopicPartitionMap.entrySet()) {
+   topicPartitionMap.put(new 
TopicPartition(entry.getKey().getTopic(), entry.getKey().getPartition()), 
entry.getValue());
+   }
+
+   return topicPartitionMap;
+
+   }
+
+   /**
+* Search offset from timestamp for each topic in kafka. If no offset 
exist, use the latest offset.
+* @param partitionTimesMap Kafka topic partition and timestamp
+* @return Kafka topic partition and the earliest offset after the 
timestamp. If no offset exist, use the latest offset in kafka
+*/
+   private Map 
convertTimestampToOffset(Map partitionTimesMap) {
--- End diff --

Of course, this would entail that we need to encode the timestamp into a 
`KafkaTopicPartitionStateSentinel`.


> FlinkKafkaConsumer should support to use timestamp to set up start offset
> -
>
> Key: FLINK-6352
> URL: https://issues.apache.org/jira/browse/FLINK-6352
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Fang Yong
>Assignee: Fang Yong
> Fix For: 1.4.0
>
>
> Currently "auto.offset.reset" is used to initialize the start offset of 
> FlinkKafkaConsumer, and the value should be earliest/latest/none. This method 
> can only let the job comsume the beginning or the most recent data, but can 
> not specify the specific offset of Kafka began to consume. 
> So, there should be a configuration item (such as 
> "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that 
> allows user to configure the initial offset of Kafka. The action of 
> "flink.source.start.time" is as follows:
> 1) job start from checkpoint / savepoint
>   a> offset of partition can be restored from checkpoint/savepoint,  
> "flink.source.start.time" will be ignored.
>   b> there's no checkpoint/savepoint for the partition (For example, this 
> partition is newly increased), the "flink.kafka.start.time" will be used to 
> initialize the offset of the partition
> 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used 
> to initialize the offset of the kafka
>   a> the "flink.source.start.time" is valid, use it to set the offset of kafka
>   b> the "flink.source.start.time" is out-of-range, the same as it does 
> currently with no initial offset, get kafka's current offset and start reading



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset

2017-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16069514#comment-16069514
 ] 

ASF GitHub Bot commented on FLINK-6352:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3915#discussion_r124964821
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
 ---
@@ -128,6 +135,53 @@ public FlinkKafkaConsumer010(List topics, 
KeyedDeserializationSchema
}
 
@Override
+   public FlinkKafkaConsumerBase setStartFromSpecificDate(Date date) {
+   Preconditions.checkArgument(null != date && date.getTime() <= 
System.currentTimeMillis(), "Startup time must before curr time.");
+   this.startupMode = StartupMode.SPECIFIC_TIMESTAMP;
+   this.specificStartupDate = date;
+   this.specificStartupOffsets = null;
+   return this;
+   }
+
+   /**
+* Convert flink topic partition to kafka topic partition.
+* @param flinkTopicPartitionMap
+* @return
+*/
+   private Map 
convertFlinkToKafkaTopicPartition(Map 
flinkTopicPartitionMap) {
+   Map topicPartitionMap = new 
HashMap<>(flinkTopicPartitionMap.size());
+   for (Map.Entry entry : 
flinkTopicPartitionMap.entrySet()) {
+   topicPartitionMap.put(new 
TopicPartition(entry.getKey().getTopic(), entry.getKey().getPartition()), 
entry.getValue());
+   }
+
+   return topicPartitionMap;
+
+   }
+
+   /**
+* Search offset from timestamp for each topic in kafka. If no offset 
exist, use the latest offset.
+* @param partitionTimesMap Kafka topic partition and timestamp
+* @return Kafka topic partition and the earliest offset after the 
timestamp. If no offset exist, use the latest offset in kafka
+*/
+   private Map 
convertTimestampToOffset(Map partitionTimesMap) {
--- End diff --

Could you move these private aux methods to the end of the class? That 
would benefit the readability / flow of the code.


> FlinkKafkaConsumer should support to use timestamp to set up start offset
> -
>
> Key: FLINK-6352
> URL: https://issues.apache.org/jira/browse/FLINK-6352
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Fang Yong
>Assignee: Fang Yong
> Fix For: 1.4.0
>
>
> Currently "auto.offset.reset" is used to initialize the start offset of 
> FlinkKafkaConsumer, and the value should be earliest/latest/none. This method 
> can only let the job comsume the beginning or the most recent data, but can 
> not specify the specific offset of Kafka began to consume. 
> So, there should be a configuration item (such as 
> "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that 
> allows user to configure the initial offset of Kafka. The action of 
> "flink.source.start.time" is as follows:
> 1) job start from checkpoint / savepoint
>   a> offset of partition can be restored from checkpoint/savepoint,  
> "flink.source.start.time" will be ignored.
>   b> there's no checkpoint/savepoint for the partition (For example, this 
> partition is newly increased), the "flink.kafka.start.time" will be used to 
> initialize the offset of the partition
> 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used 
> to initialize the offset of the kafka
>   a> the "flink.source.start.time" is valid, use it to set the offset of kafka
>   b> the "flink.source.start.time" is out-of-range, the same as it does 
> currently with no initial offset, get kafka's current offset and start reading



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset

2017-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16069513#comment-16069513
 ] 

ASF GitHub Bot commented on FLINK-6352:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3915#discussion_r124964813
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
 ---
@@ -128,6 +135,53 @@ public FlinkKafkaConsumer010(List topics, 
KeyedDeserializationSchema
}
 
@Override
+   public FlinkKafkaConsumerBase setStartFromSpecificDate(Date date) {
+   Preconditions.checkArgument(null != date && date.getTime() <= 
System.currentTimeMillis(), "Startup time must before curr time.");
+   this.startupMode = StartupMode.SPECIFIC_TIMESTAMP;
+   this.specificStartupDate = date;
+   this.specificStartupOffsets = null;
+   return this;
+   }
+
+   /**
+* Convert flink topic partition to kafka topic partition.
+* @param flinkTopicPartitionMap
+* @return
+*/
+   private Map 
convertFlinkToKafkaTopicPartition(Map 
flinkTopicPartitionMap) {
--- End diff --

Could you move these private aux methods to the end of the class? That 
would benefit the readability / flow of the code.


> FlinkKafkaConsumer should support to use timestamp to set up start offset
> -
>
> Key: FLINK-6352
> URL: https://issues.apache.org/jira/browse/FLINK-6352
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Fang Yong
>Assignee: Fang Yong
> Fix For: 1.4.0
>
>
> Currently "auto.offset.reset" is used to initialize the start offset of 
> FlinkKafkaConsumer, and the value should be earliest/latest/none. This method 
> can only let the job comsume the beginning or the most recent data, but can 
> not specify the specific offset of Kafka began to consume. 
> So, there should be a configuration item (such as 
> "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that 
> allows user to configure the initial offset of Kafka. The action of 
> "flink.source.start.time" is as follows:
> 1) job start from checkpoint / savepoint
>   a> offset of partition can be restored from checkpoint/savepoint,  
> "flink.source.start.time" will be ignored.
>   b> there's no checkpoint/savepoint for the partition (For example, this 
> partition is newly increased), the "flink.kafka.start.time" will be used to 
> initialize the offset of the partition
> 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used 
> to initialize the offset of the kafka
>   a> the "flink.source.start.time" is valid, use it to set the offset of kafka
>   b> the "flink.source.start.time" is out-of-range, the same as it does 
> currently with no initial offset, get kafka's current offset and start reading



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset

2017-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16069509#comment-16069509
 ] 

ASF GitHub Bot commented on FLINK-6352:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3915#discussion_r124965642
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
 ---
@@ -128,6 +135,53 @@ public FlinkKafkaConsumer010(List topics, 
KeyedDeserializationSchema
}
 
@Override
+   public FlinkKafkaConsumerBase setStartFromSpecificDate(Date date) {
+   Preconditions.checkArgument(null != date && date.getTime() <= 
System.currentTimeMillis(), "Startup time must before curr time.");
+   this.startupMode = StartupMode.SPECIFIC_TIMESTAMP;
+   this.specificStartupDate = date;
+   this.specificStartupOffsets = null;
+   return this;
+   }
+
+   /**
+* Convert flink topic partition to kafka topic partition.
+* @param flinkTopicPartitionMap
+* @return
+*/
+   private Map 
convertFlinkToKafkaTopicPartition(Map 
flinkTopicPartitionMap) {
+   Map topicPartitionMap = new 
HashMap<>(flinkTopicPartitionMap.size());
+   for (Map.Entry entry : 
flinkTopicPartitionMap.entrySet()) {
+   topicPartitionMap.put(new 
TopicPartition(entry.getKey().getTopic(), entry.getKey().getPartition()), 
entry.getValue());
+   }
+
+   return topicPartitionMap;
+
+   }
+
+   /**
+* Search offset from timestamp for each topic in kafka. If no offset 
exist, use the latest offset.
+* @param partitionTimesMap Kafka topic partition and timestamp
+* @return Kafka topic partition and the earliest offset after the 
timestamp. If no offset exist, use the latest offset in kafka
+*/
+   private Map 
convertTimestampToOffset(Map partitionTimesMap) {
--- End diff --

I think we need to move this conversion logic to `KafkaConsumerThread`, 
otherwise we would be instantiating a KafkaConsumer just for the sake of 
fetching timestamp-based offsets.
That's where the actual "`KafkaTopicPartitionStateSentinel` to actual 
offset" conversions take place.
See `KafkaConsumerThread` lines 369 - 390
```
// offsets in the state of new partitions may still be placeholder sentinel 
values if we are:
//   (1) starting fresh,
//   (2) checkpoint / savepoint state we were restored with had not 
completely
//   been replaced with actual offset values yet, or
//   (3) the partition was newly discovered after startup;
// replace those with actual offsets, according to what the sentinel value 
represent.
for (KafkaTopicPartitionState newPartitionState : 
newPartitions) {
if (newPartitionState.getOffset() == 
KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET) {
consumerCallBridge.seekPartitionToBeginning(consumerTmp, 
newPartitionState.getKafkaPartitionHandle());

newPartitionState.setOffset(consumerTmp.position(newPartitionState.getKafkaPartitionHandle())
 - 1);
} else if (newPartitionState.getOffset() == 
KafkaTopicPartitionStateSentinel.LATEST_OFFSET) {
consumerCallBridge.seekPartitionToEnd(consumerTmp, 
newPartitionState.getKafkaPartitionHandle());

newPartitionState.setOffset(consumerTmp.position(newPartitionState.getKafkaPartitionHandle())
 - 1);
} else if (newPartitionState.getOffset() == 
KafkaTopicPartitionStateSentinel.GROUP_OFFSET) {
// the KafkaConsumer by default will automatically seek the 
consumer position
// to the committed group offset, so we do not need to do it.


newPartitionState.setOffset(consumerTmp.position(newPartitionState.getKafkaPartitionHandle())
 - 1);
} else {
consumerTmp.seek(newPartitionState.getKafkaPartitionHandle(), 
newPartitionState.getOffset() + 1);
}
}
```


> FlinkKafkaConsumer should support to use timestamp to set up start offset
> -
>
> Key: FLINK-6352
> URL: https://issues.apache.org/jira/browse/FLINK-6352
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Fang Yong
>Assignee: Fang Yong
> Fix For: 1.4.0
>
>
> Currently "auto.offset.reset" is used to initialize the start offset of 
> FlinkKafkaConsumer, and the value should be earliest/latest/none. This 

[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset

2017-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16069511#comment-16069511
 ] 

ASF GitHub Bot commented on FLINK-6352:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3915#discussion_r124966340
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
 ---
@@ -697,13 +738,19 @@ protected static void 
initializeSubscribedPartitionsToStartOffsets(
int indexOfThisSubtask,
int numParallelSubtasks,
StartupMode startupMode,
+   Date specificStartupDate,
Map specificStartupOffsets) {
 
for (int i = 0; i < kafkaTopicPartitions.size(); i++) {
if (i % numParallelSubtasks == indexOfThisSubtask) {
-   if (startupMode != 
StartupMode.SPECIFIC_OFFSETS) {
-   
subscribedPartitionsToStartOffsets.put(kafkaTopicPartitions.get(i), 
startupMode.getStateSentinel());
-   } else {
+   if (startupMode == 
StartupMode.SPECIFIC_TIMESTAMP) {
+   if (specificStartupDate == null) {
+   throw new 
IllegalArgumentException(
+   "Startup mode for the 
consumer set to " + StartupMode.SPECIFIC_TIMESTAMP +
+   ", but no 
specific timestamp were specified");
+   }
+   
subscribedPartitionsToStartOffsets.put(kafkaTopicPartitions.get(i), 
specificStartupDate.getTime());
--- End diff --

This is the main problem:
following the original design pattern, it would be better to place a 
`KafkaTopicPartitionStateSentinel` here instead of eagerly converting the 
`Date` to a specific offset. We only convert the date to specific offsets when 
we're about to start consuming the partition (i.e. in `KafkaConsumer` thread).


> FlinkKafkaConsumer should support to use timestamp to set up start offset
> -
>
> Key: FLINK-6352
> URL: https://issues.apache.org/jira/browse/FLINK-6352
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Fang Yong
>Assignee: Fang Yong
> Fix For: 1.4.0
>
>
> Currently "auto.offset.reset" is used to initialize the start offset of 
> FlinkKafkaConsumer, and the value should be earliest/latest/none. This method 
> can only let the job comsume the beginning or the most recent data, but can 
> not specify the specific offset of Kafka began to consume. 
> So, there should be a configuration item (such as 
> "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that 
> allows user to configure the initial offset of Kafka. The action of 
> "flink.source.start.time" is as follows:
> 1) job start from checkpoint / savepoint
>   a> offset of partition can be restored from checkpoint/savepoint,  
> "flink.source.start.time" will be ignored.
>   b> there's no checkpoint/savepoint for the partition (For example, this 
> partition is newly increased), the "flink.kafka.start.time" will be used to 
> initialize the offset of the partition
> 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used 
> to initialize the offset of the kafka
>   a> the "flink.source.start.time" is valid, use it to set the offset of kafka
>   b> the "flink.source.start.time" is out-of-range, the same as it does 
> currently with no initial offset, get kafka's current offset and start reading



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset

2017-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16069508#comment-16069508
 ] 

ASF GitHub Bot commented on FLINK-6352:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3915#discussion_r124964448
  
--- Diff: flink-connectors/flink-connector-kafka-0.10/pom.xml ---
@@ -37,7 +37,7 @@ under the License.
 


-   0.10.0.1
+   0.10.1.0
--- End diff --

cool, thanks!


> FlinkKafkaConsumer should support to use timestamp to set up start offset
> -
>
> Key: FLINK-6352
> URL: https://issues.apache.org/jira/browse/FLINK-6352
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Fang Yong
>Assignee: Fang Yong
> Fix For: 1.4.0
>
>
> Currently "auto.offset.reset" is used to initialize the start offset of 
> FlinkKafkaConsumer, and the value should be earliest/latest/none. This method 
> can only let the job comsume the beginning or the most recent data, but can 
> not specify the specific offset of Kafka began to consume. 
> So, there should be a configuration item (such as 
> "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that 
> allows user to configure the initial offset of Kafka. The action of 
> "flink.source.start.time" is as follows:
> 1) job start from checkpoint / savepoint
>   a> offset of partition can be restored from checkpoint/savepoint,  
> "flink.source.start.time" will be ignored.
>   b> there's no checkpoint/savepoint for the partition (For example, this 
> partition is newly increased), the "flink.kafka.start.time" will be used to 
> initialize the offset of the partition
> 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used 
> to initialize the offset of the kafka
>   a> the "flink.source.start.time" is valid, use it to set the offset of kafka
>   b> the "flink.source.start.time" is out-of-range, the same as it does 
> currently with no initial offset, get kafka's current offset and start reading



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset

2017-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16069483#comment-16069483
 ] 

ASF GitHub Bot commented on FLINK-6352:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/3915
  
@zjureel there seems to be a failure in the Kafka tests caused by this PR, 
could you have a look?
>Tests run: 1, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 2.875 sec 
<<< FAILURE! - in 
org.apache.flink.streaming.connectors.kafka.Kafka010ProducerITCase
org.apache.flink.streaming.connectors.kafka.Kafka010ProducerITCase  Time 
elapsed: 2.874 sec  <<< FAILURE!
java.lang.AssertionError: Test setup failed: null
at org.junit.Assert.fail(Assert.java:88)
at 
org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl.prepare(KafkaTestEnvironmentImpl.java:226)
at 
org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment.prepare(KafkaTestEnvironment.java:45)
at 
org.apache.flink.streaming.connectors.kafka.KafkaTestBase.startClusters(KafkaTestBase.java:138)
at 
org.apache.flink.streaming.connectors.kafka.KafkaTestBase.prepare(KafkaTestBase.java:98)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:24)
at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:283)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:173)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:153)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:128)
at 
org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:203)
at 
org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:155)
at 
org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:103)


Results :

Failed tests: 
  Kafka010ITCase>KafkaTestBase.prepare:98->KafkaTestBase.startClusters:138 
Test setup failed: null
  
Kafka010ProducerITCase>KafkaTestBase.prepare:98->KafkaTestBase.startClusters:138
 Test setup failed: null


> FlinkKafkaConsumer should support to use timestamp to set up start offset
> -
>
> Key: FLINK-6352
> URL: https://issues.apache.org/jira/browse/FLINK-6352
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Fang Yong
>Assignee: Fang Yong
> Fix For: 1.4.0
>
>
> Currently "auto.offset.reset" is used to initialize the start offset of 
> FlinkKafkaConsumer, and the value should be earliest/latest/none. This method 
> can only let the job comsume the beginning or the most recent data, but can 
> not specify the specific offset of Kafka began to consume. 
> So, there should be a configuration item (such as 
> "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that 
> allows user to configure the initial offset of Kafka. The action of 
> "flink.source.start.time" is as follows:
> 1) job start from checkpoint / savepoint
>   a> offset of partition can be restored from checkpoint/savepoint,  
> "flink.source.start.time" will be ignored.
>   b> there's no checkpoint/savepoint for the partition (For example, this 
> partition is newly increased), the "flink.kafka.start.time" will be used to 
> initialize the offset of the partition
> 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used 
> to initialize the offset of the kafka
>   a> the "flink.source.start.time" is valid, use it to set the offset of kafka
>   b> the "flink.source.start.time" is out-of-range, the same as it does 
> currently with no initial offset, get kafka's current offset and start reading



--
This message was sent by Atlassian JIRA

[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset

2017-06-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16060579#comment-16060579
 ] 

ASF GitHub Bot commented on FLINK-6352:
---

Github user zjureel commented on the issue:

https://github.com/apache/flink/pull/3915
  
@tzulitai I find there are many conficts between this PR and master, and I 
have fixed them.  Please have a look when you are free, thanks


> FlinkKafkaConsumer should support to use timestamp to set up start offset
> -
>
> Key: FLINK-6352
> URL: https://issues.apache.org/jira/browse/FLINK-6352
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Fang Yong
>Assignee: Fang Yong
> Fix For: 1.4.0
>
>
> Currently "auto.offset.reset" is used to initialize the start offset of 
> FlinkKafkaConsumer, and the value should be earliest/latest/none. This method 
> can only let the job comsume the beginning or the most recent data, but can 
> not specify the specific offset of Kafka began to consume. 
> So, there should be a configuration item (such as 
> "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that 
> allows user to configure the initial offset of Kafka. The action of 
> "flink.source.start.time" is as follows:
> 1) job start from checkpoint / savepoint
>   a> offset of partition can be restored from checkpoint/savepoint,  
> "flink.source.start.time" will be ignored.
>   b> there's no checkpoint/savepoint for the partition (For example, this 
> partition is newly increased), the "flink.kafka.start.time" will be used to 
> initialize the offset of the partition
> 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used 
> to initialize the offset of the kafka
>   a> the "flink.source.start.time" is valid, use it to set the offset of kafka
>   b> the "flink.source.start.time" is out-of-range, the same as it does 
> currently with no initial offset, get kafka's current offset and start reading



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset

2017-06-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16060561#comment-16060561
 ] 

ASF GitHub Bot commented on FLINK-6352:
---

GitHub user zjureel reopened a pull request:

https://github.com/apache/flink/pull/3915

[FLINK-6352] Support to use timestamp to set the initial offset of kafka

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [ ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zjureel/flink FLINK-6352

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3915.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3915


commit 53eaea8e73ee704e0d344fee85a67286191c6bde
Author: zjureel 
Date:   2017-06-23T08:16:49Z

[FLINK-6499] FlinkKafkaConsumer should support to use timestamp to set up 
start offset




> FlinkKafkaConsumer should support to use timestamp to set up start offset
> -
>
> Key: FLINK-6352
> URL: https://issues.apache.org/jira/browse/FLINK-6352
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Fang Yong
>Assignee: Fang Yong
> Fix For: 1.4.0
>
>
> Currently "auto.offset.reset" is used to initialize the start offset of 
> FlinkKafkaConsumer, and the value should be earliest/latest/none. This method 
> can only let the job comsume the beginning or the most recent data, but can 
> not specify the specific offset of Kafka began to consume. 
> So, there should be a configuration item (such as 
> "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that 
> allows user to configure the initial offset of Kafka. The action of 
> "flink.source.start.time" is as follows:
> 1) job start from checkpoint / savepoint
>   a> offset of partition can be restored from checkpoint/savepoint,  
> "flink.source.start.time" will be ignored.
>   b> there's no checkpoint/savepoint for the partition (For example, this 
> partition is newly increased), the "flink.kafka.start.time" will be used to 
> initialize the offset of the partition
> 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used 
> to initialize the offset of the kafka
>   a> the "flink.source.start.time" is valid, use it to set the offset of kafka
>   b> the "flink.source.start.time" is out-of-range, the same as it does 
> currently with no initial offset, get kafka's current offset and start reading



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset

2017-06-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16060552#comment-16060552
 ] 

ASF GitHub Bot commented on FLINK-6352:
---

Github user zjureel closed the pull request at:

https://github.com/apache/flink/pull/3915


> FlinkKafkaConsumer should support to use timestamp to set up start offset
> -
>
> Key: FLINK-6352
> URL: https://issues.apache.org/jira/browse/FLINK-6352
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Fang Yong
>Assignee: Fang Yong
> Fix For: 1.4.0
>
>
> Currently "auto.offset.reset" is used to initialize the start offset of 
> FlinkKafkaConsumer, and the value should be earliest/latest/none. This method 
> can only let the job comsume the beginning or the most recent data, but can 
> not specify the specific offset of Kafka began to consume. 
> So, there should be a configuration item (such as 
> "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that 
> allows user to configure the initial offset of Kafka. The action of 
> "flink.source.start.time" is as follows:
> 1) job start from checkpoint / savepoint
>   a> offset of partition can be restored from checkpoint/savepoint,  
> "flink.source.start.time" will be ignored.
>   b> there's no checkpoint/savepoint for the partition (For example, this 
> partition is newly increased), the "flink.kafka.start.time" will be used to 
> initialize the offset of the partition
> 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used 
> to initialize the offset of the kafka
>   a> the "flink.source.start.time" is valid, use it to set the offset of kafka
>   b> the "flink.source.start.time" is out-of-range, the same as it does 
> currently with no initial offset, get kafka's current offset and start reading



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset

2017-05-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16015518#comment-16015518
 ] 

ASF GitHub Bot commented on FLINK-6352:
---

Github user zjureel commented on a diff in the pull request:

https://github.com/apache/flink/pull/3915#discussion_r117206216
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
 ---
@@ -187,31 +191,65 @@ public FlinkKafkaConsumer08(List topics, 
KeyedDeserializationSchema d
validateAutoOffsetResetValue(props);
}
 
+   /**
+* Search offset from timestamp for each topic in kafka. If no offset 
exist, use the latest offset.
+*
+* @param partitionTimesMap Kafka topic partition and timestamp
+* @return Kafka topic partition and the earliest offset after the 
timestamp. If no offset exist, use the latest offset in kafka
+*/
+   private Map 
convertTimestampToOffset(Map partitionTimesMap) {
--- End diff --

Indeed, user may be doubt about the new method when he used Kafka version 
0.8 and 0.9 both. New functionality backwards compatibility is a better 
experience, I think this method could be added when timestamp is supported both 
by version 0.8 and 0.9


> FlinkKafkaConsumer should support to use timestamp to set up start offset
> -
>
> Key: FLINK-6352
> URL: https://issues.apache.org/jira/browse/FLINK-6352
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Fang Yong
>Assignee: Fang Yong
> Fix For: 1.3.0
>
>
> Currently "auto.offset.reset" is used to initialize the start offset of 
> FlinkKafkaConsumer, and the value should be earliest/latest/none. This method 
> can only let the job comsume the beginning or the most recent data, but can 
> not specify the specific offset of Kafka began to consume. 
> So, there should be a configuration item (such as 
> "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that 
> allows user to configure the initial offset of Kafka. The action of 
> "flink.source.start.time" is as follows:
> 1) job start from checkpoint / savepoint
>   a> offset of partition can be restored from checkpoint/savepoint,  
> "flink.source.start.time" will be ignored.
>   b> there's no checkpoint/savepoint for the partition (For example, this 
> partition is newly increased), the "flink.kafka.start.time" will be used to 
> initialize the offset of the partition
> 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used 
> to initialize the offset of the kafka
>   a> the "flink.source.start.time" is valid, use it to set the offset of kafka
>   b> the "flink.source.start.time" is out-of-range, the same as it does 
> currently with no initial offset, get kafka's current offset and start reading



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset

2017-05-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16015324#comment-16015324
 ] 

ASF GitHub Bot commented on FLINK-6352:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3915#discussion_r117175760
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
 ---
@@ -311,12 +311,14 @@ public FlinkKafkaConsumerBase(List topics, 
KeyedDeserializationSchema
 * from a checkpoint or savepoint. When the consumer is restored from a 
checkpoint or
 * savepoint, only the offsets in the restored state will be used.
 *
-* Note: The api is supported by kafka version >= 0.10 only.
-*
 * @return The consumer object, to allow function chaining.
 */
public FlinkKafkaConsumerBase setStartFromSpecificDate(Date date) {
-   throw new RuntimeException("This method supports kafka version 
>= 0.10 only.");
+   Preconditions.checkArgument(null != date && date.getTime() <= 
System.currentTimeMillis(), "Startup time must before curr time.");
--- End diff --

must "be" before.
Could you also add the errorneous time to the error message?


> FlinkKafkaConsumer should support to use timestamp to set up start offset
> -
>
> Key: FLINK-6352
> URL: https://issues.apache.org/jira/browse/FLINK-6352
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Fang Yong
>Assignee: Fang Yong
> Fix For: 1.3.0
>
>
> Currently "auto.offset.reset" is used to initialize the start offset of 
> FlinkKafkaConsumer, and the value should be earliest/latest/none. This method 
> can only let the job comsume the beginning or the most recent data, but can 
> not specify the specific offset of Kafka began to consume. 
> So, there should be a configuration item (such as 
> "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that 
> allows user to configure the initial offset of Kafka. The action of 
> "flink.source.start.time" is as follows:
> 1) job start from checkpoint / savepoint
>   a> offset of partition can be restored from checkpoint/savepoint,  
> "flink.source.start.time" will be ignored.
>   b> there's no checkpoint/savepoint for the partition (For example, this 
> partition is newly increased), the "flink.kafka.start.time" will be used to 
> initialize the offset of the partition
> 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used 
> to initialize the offset of the kafka
>   a> the "flink.source.start.time" is valid, use it to set the offset of kafka
>   b> the "flink.source.start.time" is out-of-range, the same as it does 
> currently with no initial offset, get kafka's current offset and start reading



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset

2017-05-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16015322#comment-16015322
 ] 

ASF GitHub Bot commented on FLINK-6352:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3915#discussion_r117175607
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
 ---
@@ -171,6 +170,11 @@ public FlinkKafkaConsumer09(List topics, 
KeyedDeserializationSchema d
}
 
@Override
+   public FlinkKafkaConsumerBase setStartFromSpecificDate(Date date) {
+   throw new RuntimeException("This method dose not support for 
version 0.8 of Kafka");
--- End diff --

Do you mean 0.9? Also, typo on "dose".
I would also suggest to be more specific: "Starting from a specific date is 
not supported for Kafka version xx".


> FlinkKafkaConsumer should support to use timestamp to set up start offset
> -
>
> Key: FLINK-6352
> URL: https://issues.apache.org/jira/browse/FLINK-6352
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Fang Yong
>Assignee: Fang Yong
> Fix For: 1.3.0
>
>
> Currently "auto.offset.reset" is used to initialize the start offset of 
> FlinkKafkaConsumer, and the value should be earliest/latest/none. This method 
> can only let the job comsume the beginning or the most recent data, but can 
> not specify the specific offset of Kafka began to consume. 
> So, there should be a configuration item (such as 
> "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that 
> allows user to configure the initial offset of Kafka. The action of 
> "flink.source.start.time" is as follows:
> 1) job start from checkpoint / savepoint
>   a> offset of partition can be restored from checkpoint/savepoint,  
> "flink.source.start.time" will be ignored.
>   b> there's no checkpoint/savepoint for the partition (For example, this 
> partition is newly increased), the "flink.kafka.start.time" will be used to 
> initialize the offset of the partition
> 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used 
> to initialize the offset of the kafka
>   a> the "flink.source.start.time" is valid, use it to set the offset of kafka
>   b> the "flink.source.start.time" is out-of-range, the same as it does 
> currently with no initial offset, get kafka's current offset and start reading



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset

2017-05-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16015323#comment-16015323
 ] 

ASF GitHub Bot commented on FLINK-6352:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3915#discussion_r117175199
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
 ---
@@ -187,31 +191,65 @@ public FlinkKafkaConsumer08(List topics, 
KeyedDeserializationSchema d
validateAutoOffsetResetValue(props);
}
 
+   /**
+* Search offset from timestamp for each topic in kafka. If no offset 
exist, use the latest offset.
+*
+* @param partitionTimesMap Kafka topic partition and timestamp
+* @return Kafka topic partition and the earliest offset after the 
timestamp. If no offset exist, use the latest offset in kafka
+*/
+   private Map 
convertTimestampToOffset(Map partitionTimesMap) {
--- End diff --

Actually I think lets just disable the timestamp option for 0.8.

I just think its a bit strange that the functionality is there for 0.8 and 
0.10, but skipped for 0.10.

Sorry for jumping back and forth here, trying to figure out what would be 
most natural.


> FlinkKafkaConsumer should support to use timestamp to set up start offset
> -
>
> Key: FLINK-6352
> URL: https://issues.apache.org/jira/browse/FLINK-6352
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Fang Yong
>Assignee: Fang Yong
> Fix For: 1.3.0
>
>
> Currently "auto.offset.reset" is used to initialize the start offset of 
> FlinkKafkaConsumer, and the value should be earliest/latest/none. This method 
> can only let the job comsume the beginning or the most recent data, but can 
> not specify the specific offset of Kafka began to consume. 
> So, there should be a configuration item (such as 
> "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that 
> allows user to configure the initial offset of Kafka. The action of 
> "flink.source.start.time" is as follows:
> 1) job start from checkpoint / savepoint
>   a> offset of partition can be restored from checkpoint/savepoint,  
> "flink.source.start.time" will be ignored.
>   b> there's no checkpoint/savepoint for the partition (For example, this 
> partition is newly increased), the "flink.kafka.start.time" will be used to 
> initialize the offset of the partition
> 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used 
> to initialize the offset of the kafka
>   a> the "flink.source.start.time" is valid, use it to set the offset of kafka
>   b> the "flink.source.start.time" is out-of-range, the same as it does 
> currently with no initial offset, get kafka's current offset and start reading



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset

2017-05-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16015307#comment-16015307
 ] 

ASF GitHub Bot commented on FLINK-6352:
---

Github user zjureel commented on the issue:

https://github.com/apache/flink/pull/3915
  
@tzulitai I have fixed `setStartFromSpecificDate` problem, and updated 
`FlinkKafkaConsumer08` so that it supports to set start offsets of Kafka by date


> FlinkKafkaConsumer should support to use timestamp to set up start offset
> -
>
> Key: FLINK-6352
> URL: https://issues.apache.org/jira/browse/FLINK-6352
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Fang Yong
>Assignee: Fang Yong
> Fix For: 1.3.0
>
>
> Currently "auto.offset.reset" is used to initialize the start offset of 
> FlinkKafkaConsumer, and the value should be earliest/latest/none. This method 
> can only let the job comsume the beginning or the most recent data, but can 
> not specify the specific offset of Kafka began to consume. 
> So, there should be a configuration item (such as 
> "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that 
> allows user to configure the initial offset of Kafka. The action of 
> "flink.source.start.time" is as follows:
> 1) job start from checkpoint / savepoint
>   a> offset of partition can be restored from checkpoint/savepoint,  
> "flink.source.start.time" will be ignored.
>   b> there's no checkpoint/savepoint for the partition (For example, this 
> partition is newly increased), the "flink.kafka.start.time" will be used to 
> initialize the offset of the partition
> 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used 
> to initialize the offset of the kafka
>   a> the "flink.source.start.time" is valid, use it to set the offset of kafka
>   b> the "flink.source.start.time" is out-of-range, the same as it does 
> currently with no initial offset, get kafka's current offset and start reading



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset

2017-05-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16015242#comment-16015242
 ] 

ASF GitHub Bot commented on FLINK-6352:
---

Github user zjureel commented on the issue:

https://github.com/apache/flink/pull/3915
  
Thank you for your suggestion. It sounds good and will be more friendly to 
users than throwing exception in `FlinkKafkaConsumerBase`. I'll fix it soon, 
thanks :)


> FlinkKafkaConsumer should support to use timestamp to set up start offset
> -
>
> Key: FLINK-6352
> URL: https://issues.apache.org/jira/browse/FLINK-6352
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Fang Yong
>Assignee: Fang Yong
> Fix For: 1.3.0
>
>
> Currently "auto.offset.reset" is used to initialize the start offset of 
> FlinkKafkaConsumer, and the value should be earliest/latest/none. This method 
> can only let the job comsume the beginning or the most recent data, but can 
> not specify the specific offset of Kafka began to consume. 
> So, there should be a configuration item (such as 
> "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that 
> allows user to configure the initial offset of Kafka. The action of 
> "flink.source.start.time" is as follows:
> 1) job start from checkpoint / savepoint
>   a> offset of partition can be restored from checkpoint/savepoint,  
> "flink.source.start.time" will be ignored.
>   b> there's no checkpoint/savepoint for the partition (For example, this 
> partition is newly increased), the "flink.kafka.start.time" will be used to 
> initialize the offset of the partition
> 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used 
> to initialize the offset of the kafka
>   a> the "flink.source.start.time" is valid, use it to set the offset of kafka
>   b> the "flink.source.start.time" is out-of-range, the same as it does 
> currently with no initial offset, get kafka's current offset and start reading



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset

2017-05-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16015237#comment-16015237
 ] 

ASF GitHub Bot commented on FLINK-6352:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/3915
  
@zjureel For Kafka 0.11, I would expect it to just extend 
`FlinkKafkaConsumer010`.
As you can see, that is also the case right now for 010; its extending 
`FlinkKafkaConsumer09` and not the base class.


> FlinkKafkaConsumer should support to use timestamp to set up start offset
> -
>
> Key: FLINK-6352
> URL: https://issues.apache.org/jira/browse/FLINK-6352
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Fang Yong
>Assignee: Fang Yong
> Fix For: 1.3.0
>
>
> Currently "auto.offset.reset" is used to initialize the start offset of 
> FlinkKafkaConsumer, and the value should be earliest/latest/none. This method 
> can only let the job comsume the beginning or the most recent data, but can 
> not specify the specific offset of Kafka began to consume. 
> So, there should be a configuration item (such as 
> "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that 
> allows user to configure the initial offset of Kafka. The action of 
> "flink.source.start.time" is as follows:
> 1) job start from checkpoint / savepoint
>   a> offset of partition can be restored from checkpoint/savepoint,  
> "flink.source.start.time" will be ignored.
>   b> there's no checkpoint/savepoint for the partition (For example, this 
> partition is newly increased), the "flink.kafka.start.time" will be used to 
> initialize the offset of the partition
> 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used 
> to initialize the offset of the kafka
>   a> the "flink.source.start.time" is valid, use it to set the offset of kafka
>   b> the "flink.source.start.time" is out-of-range, the same as it does 
> currently with no initial offset, get kafka's current offset and start reading



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset

2017-05-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16015217#comment-16015217
 ] 

ASF GitHub Bot commented on FLINK-6352:
---

Github user zjureel commented on a diff in the pull request:

https://github.com/apache/flink/pull/3915#discussion_r117162289
  
--- Diff: flink-connectors/flink-connector-kafka-0.10/pom.xml ---
@@ -37,7 +37,7 @@ under the License.
 


-   0.10.0.1
+   0.10.1.0
--- End diff --

The dependency tree of 0.10.0.1 and 0.10.1.0 is the same when I use mvn 
dependency:tree to print the dependency information:
+- org.apache.kafka:kafka-clients:jar:0.10.0.1:compile
|  +- net.jpountz.lz4:lz4:jar:1.3.0:compile
|  \- org.xerial.snappy:snappy-java:jar:1.1.2.6:compile

+- org.apache.kafka:kafka-clients:jar:0.10.1.0:compile
|  +- net.jpountz.lz4:lz4:jar:1.3.0:compile
|  \- org.xerial.snappy:snappy-java:jar:1.1.2.6:compile


> FlinkKafkaConsumer should support to use timestamp to set up start offset
> -
>
> Key: FLINK-6352
> URL: https://issues.apache.org/jira/browse/FLINK-6352
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Fang Yong
>Assignee: Fang Yong
> Fix For: 1.3.0
>
>
> Currently "auto.offset.reset" is used to initialize the start offset of 
> FlinkKafkaConsumer, and the value should be earliest/latest/none. This method 
> can only let the job comsume the beginning or the most recent data, but can 
> not specify the specific offset of Kafka began to consume. 
> So, there should be a configuration item (such as 
> "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that 
> allows user to configure the initial offset of Kafka. The action of 
> "flink.source.start.time" is as follows:
> 1) job start from checkpoint / savepoint
>   a> offset of partition can be restored from checkpoint/savepoint,  
> "flink.source.start.time" will be ignored.
>   b> there's no checkpoint/savepoint for the partition (For example, this 
> partition is newly increased), the "flink.kafka.start.time" will be used to 
> initialize the offset of the partition
> 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used 
> to initialize the offset of the kafka
>   a> the "flink.source.start.time" is valid, use it to set the offset of kafka
>   b> the "flink.source.start.time" is out-of-range, the same as it does 
> currently with no initial offset, get kafka's current offset and start reading



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset

2017-05-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16015216#comment-16015216
 ] 

ASF GitHub Bot commented on FLINK-6352:
---

Github user zjureel commented on the issue:

https://github.com/apache/flink/pull/3915
  
@tzulitai Glad to hear from you. In fact I'm also entangled with whether to 
put the `setStartFromSpecificDate` method into `FlinkKafkaConsumerBase`, and I 
put it into `FlinkKafkaComsumerBase` finally for two reasons:

1. All the other methods that set the Kafka start offset are in 
`FlinkKafkaConsumerBase`, to keep it aligned, I put `setStartFromSpecificDate` 
in `FlinkKafkaComsumerBase`
2. For subsequent versions of Kafka, such as version 0.11, this feature 
should be available also, but it may need to extend from the 
`FlinkKafkaConsumerBase` directly. I think this method will be used in multiple 
implements, so I put `setStartFromSpecificDate` in `FlinkKafkaComsumerBase`


> FlinkKafkaConsumer should support to use timestamp to set up start offset
> -
>
> Key: FLINK-6352
> URL: https://issues.apache.org/jira/browse/FLINK-6352
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Fang Yong
>Assignee: Fang Yong
> Fix For: 1.3.0
>
>
> Currently "auto.offset.reset" is used to initialize the start offset of 
> FlinkKafkaConsumer, and the value should be earliest/latest/none. This method 
> can only let the job comsume the beginning or the most recent data, but can 
> not specify the specific offset of Kafka began to consume. 
> So, there should be a configuration item (such as 
> "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that 
> allows user to configure the initial offset of Kafka. The action of 
> "flink.source.start.time" is as follows:
> 1) job start from checkpoint / savepoint
>   a> offset of partition can be restored from checkpoint/savepoint,  
> "flink.source.start.time" will be ignored.
>   b> there's no checkpoint/savepoint for the partition (For example, this 
> partition is newly increased), the "flink.kafka.start.time" will be used to 
> initialize the offset of the partition
> 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used 
> to initialize the offset of the kafka
>   a> the "flink.source.start.time" is valid, use it to set the offset of kafka
>   b> the "flink.source.start.time" is out-of-range, the same as it does 
> currently with no initial offset, get kafka's current offset and start reading



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset

2017-05-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16011953#comment-16011953
 ] 

ASF GitHub Bot commented on FLINK-6352:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/3915
  
Also, it seems like 0.8 does support a timestamp-based offset retrieval. 
See the "finding start offsets for reads" in 
https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example.


> FlinkKafkaConsumer should support to use timestamp to set up start offset
> -
>
> Key: FLINK-6352
> URL: https://issues.apache.org/jira/browse/FLINK-6352
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Fang Yong
>Assignee: Fang Yong
> Fix For: 1.3.0
>
>
> Currently "auto.offset.reset" is used to initialize the start offset of 
> FlinkKafkaConsumer, and the value should be earliest/latest/none. This method 
> can only let the job comsume the beginning or the most recent data, but can 
> not specify the specific offset of Kafka began to consume. 
> So, there should be a configuration item (such as 
> "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that 
> allows user to configure the initial offset of Kafka. The action of 
> "flink.source.start.time" is as follows:
> 1) job start from checkpoint / savepoint
>   a> offset of partition can be restored from checkpoint/savepoint,  
> "flink.source.start.time" will be ignored.
>   b> there's no checkpoint/savepoint for the partition (For example, this 
> partition is newly increased), the "flink.kafka.start.time" will be used to 
> initialize the offset of the partition
> 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used 
> to initialize the offset of the kafka
>   a> the "flink.source.start.time" is valid, use it to set the offset of kafka
>   b> the "flink.source.start.time" is out-of-range, the same as it does 
> currently with no initial offset, get kafka's current offset and start reading



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset

2017-05-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16011946#comment-16011946
 ] 

ASF GitHub Bot commented on FLINK-6352:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3915#discussion_r116674541
  
--- Diff: flink-connectors/flink-connector-kafka-0.10/pom.xml ---
@@ -37,7 +37,7 @@ under the License.
 


-   0.10.0.1
+   0.10.1.0
--- End diff --

Just to be sure: were there any additional dependencies as a result to this 
bump?


> FlinkKafkaConsumer should support to use timestamp to set up start offset
> -
>
> Key: FLINK-6352
> URL: https://issues.apache.org/jira/browse/FLINK-6352
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Fang Yong
>Assignee: Fang Yong
> Fix For: 1.3.0
>
>
> Currently "auto.offset.reset" is used to initialize the start offset of 
> FlinkKafkaConsumer, and the value should be earliest/latest/none. This method 
> can only let the job comsume the beginning or the most recent data, but can 
> not specify the specific offset of Kafka began to consume. 
> So, there should be a configuration item (such as 
> "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that 
> allows user to configure the initial offset of Kafka. The action of 
> "flink.source.start.time" is as follows:
> 1) job start from checkpoint / savepoint
>   a> offset of partition can be restored from checkpoint/savepoint,  
> "flink.source.start.time" will be ignored.
>   b> there's no checkpoint/savepoint for the partition (For example, this 
> partition is newly increased), the "flink.kafka.start.time" will be used to 
> initialize the offset of the partition
> 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used 
> to initialize the offset of the kafka
>   a> the "flink.source.start.time" is valid, use it to set the offset of kafka
>   b> the "flink.source.start.time" is out-of-range, the same as it does 
> currently with no initial offset, get kafka's current offset and start reading



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset

2017-05-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16011947#comment-16011947
 ] 

ASF GitHub Bot commented on FLINK-6352:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3915#discussion_r116674594
  
--- Diff: flink-connectors/flink-connector-kafka-0.10/pom.xml ---
@@ -53,6 +53,10 @@ under the License.
org.apache.kafka

kafka_${scala.binary.version}

+   
+   org.apache.kafka
+   kafka-clients
+   
--- End diff --

Could you explain a bit why this is needed now? Thanks :)


> FlinkKafkaConsumer should support to use timestamp to set up start offset
> -
>
> Key: FLINK-6352
> URL: https://issues.apache.org/jira/browse/FLINK-6352
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Fang Yong
>Assignee: Fang Yong
> Fix For: 1.3.0
>
>
> Currently "auto.offset.reset" is used to initialize the start offset of 
> FlinkKafkaConsumer, and the value should be earliest/latest/none. This method 
> can only let the job comsume the beginning or the most recent data, but can 
> not specify the specific offset of Kafka began to consume. 
> So, there should be a configuration item (such as 
> "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that 
> allows user to configure the initial offset of Kafka. The action of 
> "flink.source.start.time" is as follows:
> 1) job start from checkpoint / savepoint
>   a> offset of partition can be restored from checkpoint/savepoint,  
> "flink.source.start.time" will be ignored.
>   b> there's no checkpoint/savepoint for the partition (For example, this 
> partition is newly increased), the "flink.kafka.start.time" will be used to 
> initialize the offset of the partition
> 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used 
> to initialize the offset of the kafka
>   a> the "flink.source.start.time" is valid, use it to set the offset of kafka
>   b> the "flink.source.start.time" is out-of-range, the same as it does 
> currently with no initial offset, get kafka's current offset and start reading



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset

2017-05-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16011942#comment-16011942
 ] 

ASF GitHub Bot commented on FLINK-6352:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3915#discussion_r116675159
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
 ---
@@ -290,10 +296,30 @@ public FlinkKafkaConsumerBase(List topics, 
KeyedDeserializationSchema
public FlinkKafkaConsumerBase setStartFromGroupOffsets() {
this.startupMode = StartupMode.GROUP_OFFSETS;
this.specificStartupOffsets = null;
+   this.specificStartupDate = null;
return this;
}
 
/**
+* Specifies the consumer to start reading partitions from specific 
date. The specified date must before curr timestamp.
+* This lets the consumer ignore any committed group offsets in 
Zookeeper / Kafka brokers.
+*
+* The consumer will look up the earliest offset whose timestamp is 
greater than or equal to the specific date from the kafka.
--- End diff --

the kafka --> just "Kafka", with K capitalized.


> FlinkKafkaConsumer should support to use timestamp to set up start offset
> -
>
> Key: FLINK-6352
> URL: https://issues.apache.org/jira/browse/FLINK-6352
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Fang Yong
>Assignee: Fang Yong
> Fix For: 1.3.0
>
>
> Currently "auto.offset.reset" is used to initialize the start offset of 
> FlinkKafkaConsumer, and the value should be earliest/latest/none. This method 
> can only let the job comsume the beginning or the most recent data, but can 
> not specify the specific offset of Kafka began to consume. 
> So, there should be a configuration item (such as 
> "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that 
> allows user to configure the initial offset of Kafka. The action of 
> "flink.source.start.time" is as follows:
> 1) job start from checkpoint / savepoint
>   a> offset of partition can be restored from checkpoint/savepoint,  
> "flink.source.start.time" will be ignored.
>   b> there's no checkpoint/savepoint for the partition (For example, this 
> partition is newly increased), the "flink.kafka.start.time" will be used to 
> initialize the offset of the partition
> 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used 
> to initialize the offset of the kafka
>   a> the "flink.source.start.time" is valid, use it to set the offset of kafka
>   b> the "flink.source.start.time" is out-of-range, the same as it does 
> currently with no initial offset, get kafka's current offset and start reading



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset

2017-05-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16011945#comment-16011945
 ] 

ASF GitHub Bot commented on FLINK-6352:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3915#discussion_r116676798
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
 ---
@@ -290,10 +296,30 @@ public FlinkKafkaConsumerBase(List topics, 
KeyedDeserializationSchema
public FlinkKafkaConsumerBase setStartFromGroupOffsets() {
this.startupMode = StartupMode.GROUP_OFFSETS;
this.specificStartupOffsets = null;
+   this.specificStartupDate = null;
return this;
}
 
/**
+* Specifies the consumer to start reading partitions from specific 
date. The specified date must before curr timestamp.
+* This lets the consumer ignore any committed group offsets in 
Zookeeper / Kafka brokers.
+*
+* The consumer will look up the earliest offset whose timestamp is 
greater than or equal to the specific date from the kafka.
+* If there's no such message, the consumer will use the latest offset 
to read data from kafka.
+*
+* This method does not effect where partitions are read from when the 
consumer is restored
+* from a checkpoint or savepoint. When the consumer is restored from a 
checkpoint or
+* savepoint, only the offsets in the restored state will be used.
+*
+* Note: The api is supported by kafka version >= 0.10 only.
+*
+* @return The consumer object, to allow function chaining.
+*/
+   public FlinkKafkaConsumerBase setStartFromSpecificDate(Date date) {
+   throw new RuntimeException("This method supports kafka version 
>= 0.10 only.");
--- End diff --

If only 0.10 supports this, shouldn't we add it to the 
`FlinkKafkaConsumer010` class only?


> FlinkKafkaConsumer should support to use timestamp to set up start offset
> -
>
> Key: FLINK-6352
> URL: https://issues.apache.org/jira/browse/FLINK-6352
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Fang Yong
>Assignee: Fang Yong
> Fix For: 1.3.0
>
>
> Currently "auto.offset.reset" is used to initialize the start offset of 
> FlinkKafkaConsumer, and the value should be earliest/latest/none. This method 
> can only let the job comsume the beginning or the most recent data, but can 
> not specify the specific offset of Kafka began to consume. 
> So, there should be a configuration item (such as 
> "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that 
> allows user to configure the initial offset of Kafka. The action of 
> "flink.source.start.time" is as follows:
> 1) job start from checkpoint / savepoint
>   a> offset of partition can be restored from checkpoint/savepoint,  
> "flink.source.start.time" will be ignored.
>   b> there's no checkpoint/savepoint for the partition (For example, this 
> partition is newly increased), the "flink.kafka.start.time" will be used to 
> initialize the offset of the partition
> 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used 
> to initialize the offset of the kafka
>   a> the "flink.source.start.time" is valid, use it to set the offset of kafka
>   b> the "flink.source.start.time" is out-of-range, the same as it does 
> currently with no initial offset, get kafka's current offset and start reading



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset

2017-05-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16011941#comment-16011941
 ] 

ASF GitHub Bot commented on FLINK-6352:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3915#discussion_r116675211
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
 ---
@@ -290,10 +296,30 @@ public FlinkKafkaConsumerBase(List topics, 
KeyedDeserializationSchema
public FlinkKafkaConsumerBase setStartFromGroupOffsets() {
this.startupMode = StartupMode.GROUP_OFFSETS;
this.specificStartupOffsets = null;
+   this.specificStartupDate = null;
return this;
}
 
/**
+* Specifies the consumer to start reading partitions from specific 
date. The specified date must before curr timestamp.
+* This lets the consumer ignore any committed group offsets in 
Zookeeper / Kafka brokers.
+*
+* The consumer will look up the earliest offset whose timestamp is 
greater than or equal to the specific date from the kafka.
+* If there's no such message, the consumer will use the latest offset 
to read data from kafka.
--- End diff --

"message" --> "offset" is the term used in Kafka


> FlinkKafkaConsumer should support to use timestamp to set up start offset
> -
>
> Key: FLINK-6352
> URL: https://issues.apache.org/jira/browse/FLINK-6352
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Fang Yong
>Assignee: Fang Yong
> Fix For: 1.3.0
>
>
> Currently "auto.offset.reset" is used to initialize the start offset of 
> FlinkKafkaConsumer, and the value should be earliest/latest/none. This method 
> can only let the job comsume the beginning or the most recent data, but can 
> not specify the specific offset of Kafka began to consume. 
> So, there should be a configuration item (such as 
> "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that 
> allows user to configure the initial offset of Kafka. The action of 
> "flink.source.start.time" is as follows:
> 1) job start from checkpoint / savepoint
>   a> offset of partition can be restored from checkpoint/savepoint,  
> "flink.source.start.time" will be ignored.
>   b> there's no checkpoint/savepoint for the partition (For example, this 
> partition is newly increased), the "flink.kafka.start.time" will be used to 
> initialize the offset of the partition
> 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used 
> to initialize the offset of the kafka
>   a> the "flink.source.start.time" is valid, use it to set the offset of kafka
>   b> the "flink.source.start.time" is out-of-range, the same as it does 
> currently with no initial offset, get kafka's current offset and start reading



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset

2017-05-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16011943#comment-16011943
 ] 

ASF GitHub Bot commented on FLINK-6352:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3915#discussion_r116675083
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
 ---
@@ -290,10 +296,30 @@ public FlinkKafkaConsumerBase(List topics, 
KeyedDeserializationSchema
public FlinkKafkaConsumerBase setStartFromGroupOffsets() {
this.startupMode = StartupMode.GROUP_OFFSETS;
this.specificStartupOffsets = null;
+   this.specificStartupDate = null;
return this;
}
 
/**
+* Specifies the consumer to start reading partitions from specific 
date. The specified date must before curr timestamp.
--- End diff --

"curr" --> "current"
We usually avoid abbreviations like this in Javadoc.


> FlinkKafkaConsumer should support to use timestamp to set up start offset
> -
>
> Key: FLINK-6352
> URL: https://issues.apache.org/jira/browse/FLINK-6352
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Fang Yong
>Assignee: Fang Yong
> Fix For: 1.3.0
>
>
> Currently "auto.offset.reset" is used to initialize the start offset of 
> FlinkKafkaConsumer, and the value should be earliest/latest/none. This method 
> can only let the job comsume the beginning or the most recent data, but can 
> not specify the specific offset of Kafka began to consume. 
> So, there should be a configuration item (such as 
> "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that 
> allows user to configure the initial offset of Kafka. The action of 
> "flink.source.start.time" is as follows:
> 1) job start from checkpoint / savepoint
>   a> offset of partition can be restored from checkpoint/savepoint,  
> "flink.source.start.time" will be ignored.
>   b> there's no checkpoint/savepoint for the partition (For example, this 
> partition is newly increased), the "flink.kafka.start.time" will be used to 
> initialize the offset of the partition
> 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used 
> to initialize the offset of the kafka
>   a> the "flink.source.start.time" is valid, use it to set the offset of kafka
>   b> the "flink.source.start.time" is out-of-range, the same as it does 
> currently with no initial offset, get kafka's current offset and start reading



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset

2017-05-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16011944#comment-16011944
 ] 

ASF GitHub Bot commented on FLINK-6352:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/3915#discussion_r116674925
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
 ---
@@ -181,12 +181,6 @@ public FlinkKafkaConsumer09(List topics, 
KeyedDeserializationSchema d
 
boolean useMetrics = !PropertiesUtil.getBoolean(properties, 
KEY_DISABLE_METRICS, false);
 
-   // make sure that auto commit is disabled when our offset 
commit mode is ON_CHECKPOINTS;
-   // this overwrites whatever setting the user configured in the 
properties
-   if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS || 
offsetCommitMode == OffsetCommitMode.DISABLED) {
-   
properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
-   }
--- End diff --

This shouldn't be removed (I assume you accidentally removed it when 
rebasing?).


> FlinkKafkaConsumer should support to use timestamp to set up start offset
> -
>
> Key: FLINK-6352
> URL: https://issues.apache.org/jira/browse/FLINK-6352
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Fang Yong
>Assignee: Fang Yong
> Fix For: 1.3.0
>
>
> Currently "auto.offset.reset" is used to initialize the start offset of 
> FlinkKafkaConsumer, and the value should be earliest/latest/none. This method 
> can only let the job comsume the beginning or the most recent data, but can 
> not specify the specific offset of Kafka began to consume. 
> So, there should be a configuration item (such as 
> "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that 
> allows user to configure the initial offset of Kafka. The action of 
> "flink.source.start.time" is as follows:
> 1) job start from checkpoint / savepoint
>   a> offset of partition can be restored from checkpoint/savepoint,  
> "flink.source.start.time" will be ignored.
>   b> there's no checkpoint/savepoint for the partition (For example, this 
> partition is newly increased), the "flink.kafka.start.time" will be used to 
> initialize the offset of the partition
> 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used 
> to initialize the offset of the kafka
>   a> the "flink.source.start.time" is valid, use it to set the offset of kafka
>   b> the "flink.source.start.time" is out-of-range, the same as it does 
> currently with no initial offset, get kafka's current offset and start reading



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset

2017-05-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16011919#comment-16011919
 ] 

ASF GitHub Bot commented on FLINK-6352:
---

Github user zjureel commented on the issue:

https://github.com/apache/flink/pull/3915
  
@tzulitai Could you please review code here when you are free, thanks :)


> FlinkKafkaConsumer should support to use timestamp to set up start offset
> -
>
> Key: FLINK-6352
> URL: https://issues.apache.org/jira/browse/FLINK-6352
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Fang Yong
>Assignee: Fang Yong
> Fix For: 1.3.0
>
>
> Currently "auto.offset.reset" is used to initialize the start offset of 
> FlinkKafkaConsumer, and the value should be earliest/latest/none. This method 
> can only let the job comsume the beginning or the most recent data, but can 
> not specify the specific offset of Kafka began to consume. 
> So, there should be a configuration item (such as 
> "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that 
> allows user to configure the initial offset of Kafka. The action of 
> "flink.source.start.time" is as follows:
> 1) job start from checkpoint / savepoint
>   a> offset of partition can be restored from checkpoint/savepoint,  
> "flink.source.start.time" will be ignored.
>   b> there's no checkpoint/savepoint for the partition (For example, this 
> partition is newly increased), the "flink.kafka.start.time" will be used to 
> initialize the offset of the partition
> 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used 
> to initialize the offset of the kafka
>   a> the "flink.source.start.time" is valid, use it to set the offset of kafka
>   b> the "flink.source.start.time" is out-of-range, the same as it does 
> currently with no initial offset, get kafka's current offset and start reading



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset

2017-05-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16011918#comment-16011918
 ] 

ASF GitHub Bot commented on FLINK-6352:
---

GitHub user zjureel opened a pull request:

https://github.com/apache/flink/pull/3915

[FLINK-6352] Support to use timestamp to set the initial offset of kafka

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [ ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zjureel/flink FLINK-6352

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3915.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3915


commit e1f5aee8a471ef1f1e8cec3104807b22954b6a42
Author: zjureel 
Date:   2017-05-15T10:27:24Z

[FLINK-6352] Support to use timestamp to set the initial offset of kafka

commit 5d482c57ad19f0f9739fe5b40fe6e8713900e8a4
Author: zjureel 
Date:   2017-05-16T07:37:09Z

fix StreamExecutionEnvironment test




> FlinkKafkaConsumer should support to use timestamp to set up start offset
> -
>
> Key: FLINK-6352
> URL: https://issues.apache.org/jira/browse/FLINK-6352
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Fang Yong
>Assignee: Fang Yong
> Fix For: 1.3.0
>
>
> Currently "auto.offset.reset" is used to initialize the start offset of 
> FlinkKafkaConsumer, and the value should be earliest/latest/none. This method 
> can only let the job comsume the beginning or the most recent data, but can 
> not specify the specific offset of Kafka began to consume. 
> So, there should be a configuration item (such as 
> "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that 
> allows user to configure the initial offset of Kafka. The action of 
> "flink.source.start.time" is as follows:
> 1) job start from checkpoint / savepoint
>   a> offset of partition can be restored from checkpoint/savepoint,  
> "flink.source.start.time" will be ignored.
>   b> there's no checkpoint/savepoint for the partition (For example, this 
> partition is newly increased), the "flink.kafka.start.time" will be used to 
> initialize the offset of the partition
> 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used 
> to initialize the offset of the kafka
>   a> the "flink.source.start.time" is valid, use it to set the offset of kafka
>   b> the "flink.source.start.time" is out-of-range, the same as it does 
> currently with no initial offset, get kafka's current offset and start reading



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset

2017-04-21 Thread Fang Yong (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15978177#comment-15978177
 ] 

Fang Yong commented on FLINK-6352:
--

Hi [~tzulitai], [~rmetzger], what do you think about this? Or anyone has some 
suggestions, pleas feel free to comment here.

> FlinkKafkaConsumer should support to use timestamp to set up start offset
> -
>
> Key: FLINK-6352
> URL: https://issues.apache.org/jira/browse/FLINK-6352
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Fang Yong
> Fix For: 1.3.0
>
>
> Currently "auto.offset.reset" is used to initialize the start offset of 
> FlinkKafkaConsumer, and the value should be earliest/latest/none. This method 
> can only let the job comsume the beginning or the most recent data, but can 
> not specify the specific offset of Kafka began to consume. 
> So, there should be a configuration item (such as 
> "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that 
> allows user to configure the initial offset of Kafka. The action of 
> "flink.source.start.time" is as follows:
> 1) job start from checkpoint / savepoint
>   a> offset of partition can be restored from checkpoint/savepoint,  
> "flink.source.start.time" will be ignored.
>   b> there's no checkpoint/savepoint for the partition (For example, this 
> partition is newly increased), the "flink.kafka.start.time" will be used to 
> initialize the offset of the partition
> 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used 
> to initialize the offset of the kafka
>   a> the "flink.source.start.time" is valid, use it to set the offset of kafka
>   b> the "flink.source.start.time" is out-of-range, the same as it does 
> currently with no initial offset, get kafka's current offset and start reading



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)