[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset
[ https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16470268#comment-16470268 ] ASF GitHub Bot commented on FLINK-6352: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3915 > FlinkKafkaConsumer should support to use timestamp to set up start offset > - > > Key: FLINK-6352 > URL: https://issues.apache.org/jira/browse/FLINK-6352 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Fang Yong >Assignee: Fang Yong >Priority: Blocker > Fix For: 1.5.0 > > > Currently "auto.offset.reset" is used to initialize the start offset of > FlinkKafkaConsumer, and the value should be earliest/latest/none. This method > can only let the job comsume the beginning or the most recent data, but can > not specify the specific offset of Kafka began to consume. > So, there should be a configuration item (such as > "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that > allows user to configure the initial offset of Kafka. The action of > "flink.source.start.time" is as follows: > 1) job start from checkpoint / savepoint > a> offset of partition can be restored from checkpoint/savepoint, > "flink.source.start.time" will be ignored. > b> there's no checkpoint/savepoint for the partition (For example, this > partition is newly increased), the "flink.kafka.start.time" will be used to > initialize the offset of the partition > 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used > to initialize the offset of the kafka > a> the "flink.source.start.time" is valid, use it to set the offset of kafka > b> the "flink.source.start.time" is out-of-range, the same as it does > currently with no initial offset, get kafka's current offset and start reading -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset
[ https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16383258#comment-16383258 ] Tzu-Li (Gordon) Tai commented on FLINK-6352: Merged. 1.6.0: f8ca273549aded00c7cd12699cebc1f5bba83153 1.5.0: 1fcd516a0c55f22d06b4ce3d9bc37fb9d03f0e33 > FlinkKafkaConsumer should support to use timestamp to set up start offset > - > > Key: FLINK-6352 > URL: https://issues.apache.org/jira/browse/FLINK-6352 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Fang Yong >Assignee: Fang Yong >Priority: Blocker > Fix For: 1.5.0 > > > Currently "auto.offset.reset" is used to initialize the start offset of > FlinkKafkaConsumer, and the value should be earliest/latest/none. This method > can only let the job comsume the beginning or the most recent data, but can > not specify the specific offset of Kafka began to consume. > So, there should be a configuration item (such as > "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that > allows user to configure the initial offset of Kafka. The action of > "flink.source.start.time" is as follows: > 1) job start from checkpoint / savepoint > a> offset of partition can be restored from checkpoint/savepoint, > "flink.source.start.time" will be ignored. > b> there's no checkpoint/savepoint for the partition (For example, this > partition is newly increased), the "flink.kafka.start.time" will be used to > initialize the offset of the partition > 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used > to initialize the offset of the kafka > a> the "flink.source.start.time" is valid, use it to set the offset of kafka > b> the "flink.source.start.time" is out-of-range, the same as it does > currently with no initial offset, get kafka's current offset and start reading -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset
[ https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16383255#comment-16383255 ] ASF GitHub Bot commented on FLINK-6352: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5282 > FlinkKafkaConsumer should support to use timestamp to set up start offset > - > > Key: FLINK-6352 > URL: https://issues.apache.org/jira/browse/FLINK-6352 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Fang Yong >Assignee: Fang Yong >Priority: Blocker > Fix For: 1.5.0 > > > Currently "auto.offset.reset" is used to initialize the start offset of > FlinkKafkaConsumer, and the value should be earliest/latest/none. This method > can only let the job comsume the beginning or the most recent data, but can > not specify the specific offset of Kafka began to consume. > So, there should be a configuration item (such as > "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that > allows user to configure the initial offset of Kafka. The action of > "flink.source.start.time" is as follows: > 1) job start from checkpoint / savepoint > a> offset of partition can be restored from checkpoint/savepoint, > "flink.source.start.time" will be ignored. > b> there's no checkpoint/savepoint for the partition (For example, this > partition is newly increased), the "flink.kafka.start.time" will be used to > initialize the offset of the partition > 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used > to initialize the offset of the kafka > a> the "flink.source.start.time" is valid, use it to set the offset of kafka > b> the "flink.source.start.time" is out-of-range, the same as it does > currently with no initial offset, get kafka's current offset and start reading -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset
[ https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16381730#comment-16381730 ] ASF GitHub Bot commented on FLINK-6352: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5282 Thanks for the review @aljoscha! I'll proceed to merge this (while addressing your last comment) to `master` and `release-1.5`. > FlinkKafkaConsumer should support to use timestamp to set up start offset > - > > Key: FLINK-6352 > URL: https://issues.apache.org/jira/browse/FLINK-6352 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Fang Yong >Assignee: Fang Yong >Priority: Blocker > Fix For: 1.5.0 > > > Currently "auto.offset.reset" is used to initialize the start offset of > FlinkKafkaConsumer, and the value should be earliest/latest/none. This method > can only let the job comsume the beginning or the most recent data, but can > not specify the specific offset of Kafka began to consume. > So, there should be a configuration item (such as > "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that > allows user to configure the initial offset of Kafka. The action of > "flink.source.start.time" is as follows: > 1) job start from checkpoint / savepoint > a> offset of partition can be restored from checkpoint/savepoint, > "flink.source.start.time" will be ignored. > b> there's no checkpoint/savepoint for the partition (For example, this > partition is newly increased), the "flink.kafka.start.time" will be used to > initialize the offset of the partition > 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used > to initialize the offset of the kafka > a> the "flink.source.start.time" is valid, use it to set the offset of kafka > b> the "flink.source.start.time" is out-of-range, the same as it does > currently with no initial offset, get kafka's current offset and start reading -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset
[ https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16378802#comment-16378802 ] ASF GitHub Bot commented on FLINK-6352: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5282#discussion_r170966200 --- Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java --- @@ -621,12 +621,70 @@ public void runStartFromSpecificOffsets() throws Exception { partitionsToValueCountAndStartOffsets.put(2, new Tuple2<>(28, 22)); // partition 2 should read offset 22-49 partitionsToValueCountAndStartOffsets.put(3, new Tuple2<>(50, 0)); // partition 3 should read offset 0-49 - readSequence(env, StartupMode.SPECIFIC_OFFSETS, specificStartupOffsets, readProps, topicName, partitionsToValueCountAndStartOffsets); + readSequence(env, StartupMode.SPECIFIC_OFFSETS, specificStartupOffsets, null, readProps, topicName, partitionsToValueCountAndStartOffsets); kafkaOffsetHandler.close(); deleteTestTopic(topicName); } + /** +* This test ensures that the consumer correctly uses user-supplied timestamp when explicitly configured to +* start from timestamp. +* +* The validated Kafka data is written in 2 steps: first, an initial 50 records is written to each partition. +* After that, another 30 records is appended to each partition. Before each step, a timestamp is recorded. +* For the validation, when the read job is configured to start from the first timestamp, each partition should start +* from offset 0 and read a total of 80 records. When configured to start from the second timestamp, +* each partition should start from offset 50 and read on the remaining 30 appended records. +*/ + public void runStartFromTimestamp() throws Exception { + // 4 partitions with 50 records each + final int parallelism = 4; + final int initialRecordsInEachPartition = 50; + final int appendRecordsInEachPartition = 30; + + long firstTimestamp = 0; + long secondTimestamp = 0; + String topic = ""; + + // attempt to create an appended test sequence, where the timestamp of writing the appended sequence + // is assured to be larger than the timestamp of the original sequence. + final int maxRetries = 3; + int attempt = 0; + while (attempt != maxRetries) { + firstTimestamp = System.currentTimeMillis(); + topic = writeSequence("runStartFromTimestamp", initialRecordsInEachPartition, parallelism, 1); --- End diff -- Ah, I just thought that we could have a simple loop there: ``` long secondTimestamp = System.currentTimeMillis(); while (secondTimestamp <= firstTimestamp) { Thread.sleep(); secondTimestamp = System.currentTimeMillis(); } ``` what do you think? > FlinkKafkaConsumer should support to use timestamp to set up start offset > - > > Key: FLINK-6352 > URL: https://issues.apache.org/jira/browse/FLINK-6352 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Fang Yong >Assignee: Fang Yong >Priority: Blocker > Fix For: 1.5.0 > > > Currently "auto.offset.reset" is used to initialize the start offset of > FlinkKafkaConsumer, and the value should be earliest/latest/none. This method > can only let the job comsume the beginning or the most recent data, but can > not specify the specific offset of Kafka began to consume. > So, there should be a configuration item (such as > "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that > allows user to configure the initial offset of Kafka. The action of > "flink.source.start.time" is as follows: > 1) job start from checkpoint / savepoint > a> offset of partition can be restored from checkpoint/savepoint, > "flink.source.start.time" will be ignored. > b> there's no checkpoint/savepoint for the partition (For example, this > partition is newly increased), the "flink.kafka.start.time" will be used to > initialize the offset of the partition > 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used > to initialize the offset of the kafka > a> the "flink.source.start.time" is valid, use it to set the offset of kafka > b> the "flink.source.start.time" is out-of-range, the same as it does > currently with no
[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset
[ https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16378162#comment-16378162 ] ASF GitHub Bot commented on FLINK-6352: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5282 @aljoscha sorry about the leftover merge markers, I've fixed them now. > FlinkKafkaConsumer should support to use timestamp to set up start offset > - > > Key: FLINK-6352 > URL: https://issues.apache.org/jira/browse/FLINK-6352 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Fang Yong >Assignee: Fang Yong >Priority: Blocker > Fix For: 1.5.0 > > > Currently "auto.offset.reset" is used to initialize the start offset of > FlinkKafkaConsumer, and the value should be earliest/latest/none. This method > can only let the job comsume the beginning or the most recent data, but can > not specify the specific offset of Kafka began to consume. > So, there should be a configuration item (such as > "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that > allows user to configure the initial offset of Kafka. The action of > "flink.source.start.time" is as follows: > 1) job start from checkpoint / savepoint > a> offset of partition can be restored from checkpoint/savepoint, > "flink.source.start.time" will be ignored. > b> there's no checkpoint/savepoint for the partition (For example, this > partition is newly increased), the "flink.kafka.start.time" will be used to > initialize the offset of the partition > 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used > to initialize the offset of the kafka > a> the "flink.source.start.time" is valid, use it to set the offset of kafka > b> the "flink.source.start.time" is out-of-range, the same as it does > currently with no initial offset, get kafka's current offset and start reading -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset
[ https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16377178#comment-16377178 ] ASF GitHub Bot commented on FLINK-6352: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5282#discussion_r170663674 --- Diff: flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010FetcherTest.java --- @@ -129,9 +129,14 @@ public Void answer(InvocationOnMock invocation) { schema, new Properties(), 0L, +<<< HEAD --- End diff -- Leftover merge markers? > FlinkKafkaConsumer should support to use timestamp to set up start offset > - > > Key: FLINK-6352 > URL: https://issues.apache.org/jira/browse/FLINK-6352 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Fang Yong >Assignee: Fang Yong >Priority: Blocker > Fix For: 1.5.0 > > > Currently "auto.offset.reset" is used to initialize the start offset of > FlinkKafkaConsumer, and the value should be earliest/latest/none. This method > can only let the job comsume the beginning or the most recent data, but can > not specify the specific offset of Kafka began to consume. > So, there should be a configuration item (such as > "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that > allows user to configure the initial offset of Kafka. The action of > "flink.source.start.time" is as follows: > 1) job start from checkpoint / savepoint > a> offset of partition can be restored from checkpoint/savepoint, > "flink.source.start.time" will be ignored. > b> there's no checkpoint/savepoint for the partition (For example, this > partition is newly increased), the "flink.kafka.start.time" will be used to > initialize the offset of the partition > 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used > to initialize the offset of the kafka > a> the "flink.source.start.time" is valid, use it to set the offset of kafka > b> the "flink.source.start.time" is out-of-range, the same as it does > currently with no initial offset, get kafka's current offset and start reading -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset
[ https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16376818#comment-16376818 ] ASF GitHub Bot commented on FLINK-6352: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5282 @aljoscha I've addressed your comments and rebased the PR. Please have another look when you find the time, thanks a lot. > FlinkKafkaConsumer should support to use timestamp to set up start offset > - > > Key: FLINK-6352 > URL: https://issues.apache.org/jira/browse/FLINK-6352 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Fang Yong >Assignee: Fang Yong >Priority: Blocker > Fix For: 1.5.0 > > > Currently "auto.offset.reset" is used to initialize the start offset of > FlinkKafkaConsumer, and the value should be earliest/latest/none. This method > can only let the job comsume the beginning or the most recent data, but can > not specify the specific offset of Kafka began to consume. > So, there should be a configuration item (such as > "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that > allows user to configure the initial offset of Kafka. The action of > "flink.source.start.time" is as follows: > 1) job start from checkpoint / savepoint > a> offset of partition can be restored from checkpoint/savepoint, > "flink.source.start.time" will be ignored. > b> there's no checkpoint/savepoint for the partition (For example, this > partition is newly increased), the "flink.kafka.start.time" will be used to > initialize the offset of the partition > 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used > to initialize the offset of the kafka > a> the "flink.source.start.time" is valid, use it to set the offset of kafka > b> the "flink.source.start.time" is out-of-range, the same as it does > currently with no initial offset, get kafka's current offset and start reading -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset
[ https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16365270#comment-16365270 ] ASF GitHub Bot commented on FLINK-6352: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/5282 Yes, that was my main concern. With a loop it could work, yes. > FlinkKafkaConsumer should support to use timestamp to set up start offset > - > > Key: FLINK-6352 > URL: https://issues.apache.org/jira/browse/FLINK-6352 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Fang Yong >Assignee: Fang Yong >Priority: Blocker > Fix For: 1.5.0 > > > Currently "auto.offset.reset" is used to initialize the start offset of > FlinkKafkaConsumer, and the value should be earliest/latest/none. This method > can only let the job comsume the beginning or the most recent data, but can > not specify the specific offset of Kafka began to consume. > So, there should be a configuration item (such as > "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that > allows user to configure the initial offset of Kafka. The action of > "flink.source.start.time" is as follows: > 1) job start from checkpoint / savepoint > a> offset of partition can be restored from checkpoint/savepoint, > "flink.source.start.time" will be ignored. > b> there's no checkpoint/savepoint for the partition (For example, this > partition is newly increased), the "flink.kafka.start.time" will be used to > initialize the offset of the partition > 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used > to initialize the offset of the kafka > a> the "flink.source.start.time" is valid, use it to set the offset of kafka > b> the "flink.source.start.time" is out-of-range, the same as it does > currently with no initial offset, get kafka's current offset and start reading -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset
[ https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16365137#comment-16365137 ] ASF GitHub Bot commented on FLINK-6352: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5282 @aljoscha regarding the potential flakiness of the test you mentioned: I think the test will be stable, as long as the recorded timestamp of the second run is larger than the first run. We can add a loop (with max retries) for the test topic generation, until that condition is met. For the verification side (reading from Kafka), we'll essentially also be relying on Kafka to correctly return corrrect offsets for a given timestamp, but that is the case for almost all Kafka ITCases. Am I missing any other potential flakiness aspects of this? > FlinkKafkaConsumer should support to use timestamp to set up start offset > - > > Key: FLINK-6352 > URL: https://issues.apache.org/jira/browse/FLINK-6352 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Fang Yong >Assignee: Fang Yong >Priority: Blocker > Fix For: 1.5.0 > > > Currently "auto.offset.reset" is used to initialize the start offset of > FlinkKafkaConsumer, and the value should be earliest/latest/none. This method > can only let the job comsume the beginning or the most recent data, but can > not specify the specific offset of Kafka began to consume. > So, there should be a configuration item (such as > "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that > allows user to configure the initial offset of Kafka. The action of > "flink.source.start.time" is as follows: > 1) job start from checkpoint / savepoint > a> offset of partition can be restored from checkpoint/savepoint, > "flink.source.start.time" will be ignored. > b> there's no checkpoint/savepoint for the partition (For example, this > partition is newly increased), the "flink.kafka.start.time" will be used to > initialize the offset of the partition > 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used > to initialize the offset of the kafka > a> the "flink.source.start.time" is valid, use it to set the offset of kafka > b> the "flink.source.start.time" is out-of-range, the same as it does > currently with no initial offset, get kafka's current offset and start reading -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset
[ https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16365129#comment-16365129 ] ASF GitHub Bot commented on FLINK-6352: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5282#discussion_r168377228 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java --- @@ -441,28 +481,57 @@ public void open(Configuration configuration) throws Exception { getRuntimeContext().getIndexOfThisSubtask(), subscribedPartitionsToStartOffsets.size(), subscribedPartitionsToStartOffsets); } else { // use the partition discoverer to fetch the initial seed partitions, - // and set their initial offsets depending on the startup mode - for (KafkaTopicPartition seedPartition : allPartitions) { - if (startupMode != StartupMode.SPECIFIC_OFFSETS) { - subscribedPartitionsToStartOffsets.put(seedPartition, startupMode.getStateSentinel()); - } else { + // and set their initial offsets depending on the startup mode. + // for SPECIFIC_OFFSETS and TIMESTAMP modes, we set the specific offsets now; + // for other modes (EARLIEST, LATEST, and GROUP_OFFSETS), the offset is lazily determined + // when the partition is actually read. + switch (startupMode) { + case SPECIFIC_OFFSETS: if (specificStartupOffsets == null) { throw new IllegalArgumentException( "Startup mode for the consumer set to " + StartupMode.SPECIFIC_OFFSETS + - ", but no specific offsets were specified"); + ", but no specific offsets were specified."); } - Long specificOffset = specificStartupOffsets.get(seedPartition); - if (specificOffset != null) { - // since the specified offsets represent the next record to read, we subtract - // it by one so that the initial state of the consumer will be correct - subscribedPartitionsToStartOffsets.put(seedPartition, specificOffset - 1); - } else { - // default to group offset behaviour if the user-provided specific offsets - // do not contain a value for this partition - subscribedPartitionsToStartOffsets.put(seedPartition, KafkaTopicPartitionStateSentinel.GROUP_OFFSET); + for (KafkaTopicPartition seedPartition : allPartitions) { + Long specificOffset = specificStartupOffsets.get(seedPartition); + if (specificOffset != null) { + // since the specified offsets represent the next record to read, we subtract + // it by one so that the initial state of the consumer will be correct + subscribedPartitionsToStartOffsets.put(seedPartition, specificOffset - 1); + } else { + // default to group offset behaviour if the user-provided specific offsets + // do not contain a value for this partition + subscribedPartitionsToStartOffsets.put(seedPartition, KafkaTopicPartitionStateSentinel.GROUP_OFFSET); + } + } + + break; + case TIMESTAMP: + if (startupOffsetsTimestamp == null) { + throw new IllegalArgumentException( --- End diff -- That makes sense, will change (including usage in existing code) > FlinkKafkaConsumer should support to use timestamp to set up start offset >
[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset
[ https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16365130#comment-16365130 ] ASF GitHub Bot commented on FLINK-6352: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5282#discussion_r168377242 --- Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java --- @@ -1910,86 +1959,171 @@ public void cancel() { JobManagerCommunicationUtils.waitUntilNoJobIsRunning(flink.getLeaderGateway(timeout)); - final StreamExecutionEnvironment readEnv = StreamExecutionEnvironment.getExecutionEnvironment(); - readEnv.getConfig().setRestartStrategy(RestartStrategies.noRestart()); - readEnv.getConfig().disableSysoutLogging(); - readEnv.setParallelism(parallelism); + if (validateSequence(topicName, parallelism, deserSchema, numElements)) { + // everything is good! + return topicName; + } + else { + deleteTestTopic(topicName); + // fall through the loop + } + } - Properties readProps = (Properties) standardProps.clone(); - readProps.setProperty("group.id", "flink-tests-validator"); - readProps.putAll(secureProps); - FlinkKafkaConsumerBase> consumer = kafkaServer.getConsumer(topicName, deserSchema, readProps); + throw new Exception("Could not write a valid sequence to Kafka after " + maxNumAttempts + " attempts"); + } - readEnv - .addSource(consumer) - .map(new RichMapFunction , Tuple2 >() { + protected void writeAppendSequence( + String topicName, + final int originalNumElements, + final int numElementsToAppend, + final int parallelism) throws Exception { - private final int totalCount = parallelism * numElements; - private int count = 0; + LOG.info("\n===\n" + + "== Appending sequence of " + numElementsToAppend + " into " + topicName + + "==="); - @Override - public Tuple2 map(Tuple2 value) throws Exception { - if (++count == totalCount) { - throw new SuccessException(); - } else { - return value; - } - } - }).setParallelism(1) - .addSink(new DiscardingSink >()).setParallelism(1); + final TypeInformation > resultType = + TypeInformation.of(new TypeHint >() {}); - final AtomicReference errorRef = new AtomicReference<>(); + final KeyedSerializationSchema > serSchema = + new KeyedSerializationSchemaWrapper<>( + new TypeInformationSerializationSchema<>(resultType, new ExecutionConfig())); - Thread runner = new Thread() { - @Override - public void run() { - try { - tryExecute(readEnv, "sequence validation"); - } catch (Throwable t) { - errorRef.set(t); - } + final KeyedDeserializationSchema > deserSchema = + new KeyedDeserializationSchemaWrapper<>( + new TypeInformationSerializationSchema<>(resultType, new ExecutionConfig())); + + // Write the append sequence
[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset
[ https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16365125#comment-16365125 ] ASF GitHub Bot commented on FLINK-6352: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5282#discussion_r168377156 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java --- @@ -343,6 +348,39 @@ public FlinkKafkaConsumerBase( */ public FlinkKafkaConsumerBase setStartFromLatest() { this.startupMode = StartupMode.LATEST; + this.startupOffsetsTimestamp = null; + this.specificStartupOffsets = null; + return this; + } + + /** +* Specifies the consumer to start reading partitions from a specified timestamp. +* The specified timestamp must be before the current timestamp. +* This lets the consumer ignore any committed group offsets in Zookeeper / Kafka brokers. +* +* The consumer will look up the earliest offset whose timestamp is greater than or equal +* to the specific timestamp from Kafka. If there's no such offset, the consumer will use the +* latest offset to read data from kafka. +* +* This method does not effect where partitions are read from when the consumer is restored --- End diff -- Will fix. > FlinkKafkaConsumer should support to use timestamp to set up start offset > - > > Key: FLINK-6352 > URL: https://issues.apache.org/jira/browse/FLINK-6352 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Fang Yong >Assignee: Fang Yong >Priority: Blocker > Fix For: 1.5.0 > > > Currently "auto.offset.reset" is used to initialize the start offset of > FlinkKafkaConsumer, and the value should be earliest/latest/none. This method > can only let the job comsume the beginning or the most recent data, but can > not specify the specific offset of Kafka began to consume. > So, there should be a configuration item (such as > "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that > allows user to configure the initial offset of Kafka. The action of > "flink.source.start.time" is as follows: > 1) job start from checkpoint / savepoint > a> offset of partition can be restored from checkpoint/savepoint, > "flink.source.start.time" will be ignored. > b> there's no checkpoint/savepoint for the partition (For example, this > partition is newly increased), the "flink.kafka.start.time" will be used to > initialize the offset of the partition > 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used > to initialize the offset of the kafka > a> the "flink.source.start.time" is valid, use it to set the offset of kafka > b> the "flink.source.start.time" is out-of-range, the same as it does > currently with no initial offset, get kafka's current offset and start reading -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset
[ https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16365126#comment-16365126 ] ASF GitHub Bot commented on FLINK-6352: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5282#discussion_r168377173 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java --- @@ -343,6 +348,39 @@ public FlinkKafkaConsumerBase( */ public FlinkKafkaConsumerBase setStartFromLatest() { this.startupMode = StartupMode.LATEST; + this.startupOffsetsTimestamp = null; + this.specificStartupOffsets = null; + return this; + } + + /** +* Specifies the consumer to start reading partitions from a specified timestamp. +* The specified timestamp must be before the current timestamp. +* This lets the consumer ignore any committed group offsets in Zookeeper / Kafka brokers. +* +* The consumer will look up the earliest offset whose timestamp is greater than or equal +* to the specific timestamp from Kafka. If there's no such offset, the consumer will use the +* latest offset to read data from kafka. +* +* This method does not effect where partitions are read from when the consumer is restored +* from a checkpoint or savepoint. When the consumer is restored from a checkpoint or +* savepoint, only the offsets in the restored state will be used. +* +* @return The consumer object, to allow function chaining. +*/ + // NOTE - + // This method is implemented in the base class because this is where the startup logging and verifications live. + // However, it is not publicly exposed since only newer Kafka versions support the functionality. + // Version-specific subclasses which can expose the functionality should override and allow public access. + protected FlinkKafkaConsumerBase setStartFromTimestamp(long startupOffsetsTimestamp) { + checkNotNull(startupOffsetsTimestamp, "startupOffsetsTimestamp"); --- End diff -- I'll change to a more meaningful message. > FlinkKafkaConsumer should support to use timestamp to set up start offset > - > > Key: FLINK-6352 > URL: https://issues.apache.org/jira/browse/FLINK-6352 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Fang Yong >Assignee: Fang Yong >Priority: Blocker > Fix For: 1.5.0 > > > Currently "auto.offset.reset" is used to initialize the start offset of > FlinkKafkaConsumer, and the value should be earliest/latest/none. This method > can only let the job comsume the beginning or the most recent data, but can > not specify the specific offset of Kafka began to consume. > So, there should be a configuration item (such as > "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that > allows user to configure the initial offset of Kafka. The action of > "flink.source.start.time" is as follows: > 1) job start from checkpoint / savepoint > a> offset of partition can be restored from checkpoint/savepoint, > "flink.source.start.time" will be ignored. > b> there's no checkpoint/savepoint for the partition (For example, this > partition is newly increased), the "flink.kafka.start.time" will be used to > initialize the offset of the partition > 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used > to initialize the offset of the kafka > a> the "flink.source.start.time" is valid, use it to set the offset of kafka > b> the "flink.source.start.time" is out-of-range, the same as it does > currently with no initial offset, get kafka's current offset and start reading -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset
[ https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16365127#comment-16365127 ] ASF GitHub Bot commented on FLINK-6352: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5282#discussion_r168377183 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java --- @@ -343,6 +348,39 @@ public FlinkKafkaConsumerBase( */ public FlinkKafkaConsumerBase setStartFromLatest() { this.startupMode = StartupMode.LATEST; + this.startupOffsetsTimestamp = null; + this.specificStartupOffsets = null; + return this; + } + + /** +* Specifies the consumer to start reading partitions from a specified timestamp. +* The specified timestamp must be before the current timestamp. +* This lets the consumer ignore any committed group offsets in Zookeeper / Kafka brokers. +* +* The consumer will look up the earliest offset whose timestamp is greater than or equal +* to the specific timestamp from Kafka. If there's no such offset, the consumer will use the +* latest offset to read data from kafka. +* +* This method does not effect where partitions are read from when the consumer is restored +* from a checkpoint or savepoint. When the consumer is restored from a checkpoint or +* savepoint, only the offsets in the restored state will be used. +* +* @return The consumer object, to allow function chaining. +*/ + // NOTE - + // This method is implemented in the base class because this is where the startup logging and verifications live. + // However, it is not publicly exposed since only newer Kafka versions support the functionality. + // Version-specific subclasses which can expose the functionality should override and allow public access. + protected FlinkKafkaConsumerBase setStartFromTimestamp(long startupOffsetsTimestamp) { + checkNotNull(startupOffsetsTimestamp, "startupOffsetsTimestamp"); + + long currentTimestamp = System.currentTimeMillis(); + checkArgument(startupOffsetsTimestamp <= currentTimestamp, + "Startup time[" + startupOffsetsTimestamp + "] must be before current time[" + currentTimestamp + "]."); --- End diff -- > FlinkKafkaConsumer should support to use timestamp to set up start offset > - > > Key: FLINK-6352 > URL: https://issues.apache.org/jira/browse/FLINK-6352 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Fang Yong >Assignee: Fang Yong >Priority: Blocker > Fix For: 1.5.0 > > > Currently "auto.offset.reset" is used to initialize the start offset of > FlinkKafkaConsumer, and the value should be earliest/latest/none. This method > can only let the job comsume the beginning or the most recent data, but can > not specify the specific offset of Kafka began to consume. > So, there should be a configuration item (such as > "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that > allows user to configure the initial offset of Kafka. The action of > "flink.source.start.time" is as follows: > 1) job start from checkpoint / savepoint > a> offset of partition can be restored from checkpoint/savepoint, > "flink.source.start.time" will be ignored. > b> there's no checkpoint/savepoint for the partition (For example, this > partition is newly increased), the "flink.kafka.start.time" will be used to > initialize the offset of the partition > 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used > to initialize the offset of the kafka > a> the "flink.source.start.time" is valid, use it to set the offset of kafka > b> the "flink.source.start.time" is out-of-range, the same as it does > currently with no initial offset, get kafka's current offset and start reading -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset
[ https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16364023#comment-16364023 ] ASF GitHub Bot commented on FLINK-6352: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5282#discussion_r167941419 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java --- @@ -343,6 +348,39 @@ public FlinkKafkaConsumerBase( */ public FlinkKafkaConsumerBase setStartFromLatest() { this.startupMode = StartupMode.LATEST; + this.startupOffsetsTimestamp = null; + this.specificStartupOffsets = null; + return this; + } + + /** +* Specifies the consumer to start reading partitions from a specified timestamp. +* The specified timestamp must be before the current timestamp. +* This lets the consumer ignore any committed group offsets in Zookeeper / Kafka brokers. +* +* The consumer will look up the earliest offset whose timestamp is greater than or equal +* to the specific timestamp from Kafka. If there's no such offset, the consumer will use the +* latest offset to read data from kafka. +* +* This method does not effect where partitions are read from when the consumer is restored +* from a checkpoint or savepoint. When the consumer is restored from a checkpoint or +* savepoint, only the offsets in the restored state will be used. +* +* @return The consumer object, to allow function chaining. +*/ + // NOTE - + // This method is implemented in the base class because this is where the startup logging and verifications live. + // However, it is not publicly exposed since only newer Kafka versions support the functionality. + // Version-specific subclasses which can expose the functionality should override and allow public access. + protected FlinkKafkaConsumerBase setStartFromTimestamp(long startupOffsetsTimestamp) { + checkNotNull(startupOffsetsTimestamp, "startupOffsetsTimestamp"); --- End diff -- I think the error message might not be helpful. > FlinkKafkaConsumer should support to use timestamp to set up start offset > - > > Key: FLINK-6352 > URL: https://issues.apache.org/jira/browse/FLINK-6352 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Fang Yong >Assignee: Fang Yong >Priority: Blocker > Fix For: 1.5.0 > > > Currently "auto.offset.reset" is used to initialize the start offset of > FlinkKafkaConsumer, and the value should be earliest/latest/none. This method > can only let the job comsume the beginning or the most recent data, but can > not specify the specific offset of Kafka began to consume. > So, there should be a configuration item (such as > "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that > allows user to configure the initial offset of Kafka. The action of > "flink.source.start.time" is as follows: > 1) job start from checkpoint / savepoint > a> offset of partition can be restored from checkpoint/savepoint, > "flink.source.start.time" will be ignored. > b> there's no checkpoint/savepoint for the partition (For example, this > partition is newly increased), the "flink.kafka.start.time" will be used to > initialize the offset of the partition > 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used > to initialize the offset of the kafka > a> the "flink.source.start.time" is valid, use it to set the offset of kafka > b> the "flink.source.start.time" is out-of-range, the same as it does > currently with no initial offset, get kafka's current offset and start reading -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset
[ https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16364024#comment-16364024 ] ASF GitHub Bot commented on FLINK-6352: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5282#discussion_r168169010 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java --- @@ -343,6 +348,39 @@ public FlinkKafkaConsumerBase( */ public FlinkKafkaConsumerBase setStartFromLatest() { this.startupMode = StartupMode.LATEST; + this.startupOffsetsTimestamp = null; + this.specificStartupOffsets = null; + return this; + } + + /** +* Specifies the consumer to start reading partitions from a specified timestamp. +* The specified timestamp must be before the current timestamp. +* This lets the consumer ignore any committed group offsets in Zookeeper / Kafka brokers. +* +* The consumer will look up the earliest offset whose timestamp is greater than or equal +* to the specific timestamp from Kafka. If there's no such offset, the consumer will use the +* latest offset to read data from kafka. +* +* This method does not effect where partitions are read from when the consumer is restored +* from a checkpoint or savepoint. When the consumer is restored from a checkpoint or +* savepoint, only the offsets in the restored state will be used. +* +* @return The consumer object, to allow function chaining. +*/ + // NOTE - + // This method is implemented in the base class because this is where the startup logging and verifications live. + // However, it is not publicly exposed since only newer Kafka versions support the functionality. + // Version-specific subclasses which can expose the functionality should override and allow public access. + protected FlinkKafkaConsumerBase setStartFromTimestamp(long startupOffsetsTimestamp) { + checkNotNull(startupOffsetsTimestamp, "startupOffsetsTimestamp"); + + long currentTimestamp = System.currentTimeMillis(); + checkArgument(startupOffsetsTimestamp <= currentTimestamp, + "Startup time[" + startupOffsetsTimestamp + "] must be before current time[" + currentTimestamp + "]."); --- End diff -- This should use `"%s"` for string interpolation instead of doing string concatenation. > FlinkKafkaConsumer should support to use timestamp to set up start offset > - > > Key: FLINK-6352 > URL: https://issues.apache.org/jira/browse/FLINK-6352 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Fang Yong >Assignee: Fang Yong >Priority: Blocker > Fix For: 1.5.0 > > > Currently "auto.offset.reset" is used to initialize the start offset of > FlinkKafkaConsumer, and the value should be earliest/latest/none. This method > can only let the job comsume the beginning or the most recent data, but can > not specify the specific offset of Kafka began to consume. > So, there should be a configuration item (such as > "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that > allows user to configure the initial offset of Kafka. The action of > "flink.source.start.time" is as follows: > 1) job start from checkpoint / savepoint > a> offset of partition can be restored from checkpoint/savepoint, > "flink.source.start.time" will be ignored. > b> there's no checkpoint/savepoint for the partition (For example, this > partition is newly increased), the "flink.kafka.start.time" will be used to > initialize the offset of the partition > 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used > to initialize the offset of the kafka > a> the "flink.source.start.time" is valid, use it to set the offset of kafka > b> the "flink.source.start.time" is out-of-range, the same as it does > currently with no initial offset, get kafka's current offset and start reading -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset
[ https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16364026#comment-16364026 ] ASF GitHub Bot commented on FLINK-6352: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5282#discussion_r168171479 --- Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java --- @@ -1910,86 +1959,171 @@ public void cancel() { JobManagerCommunicationUtils.waitUntilNoJobIsRunning(flink.getLeaderGateway(timeout)); - final StreamExecutionEnvironment readEnv = StreamExecutionEnvironment.getExecutionEnvironment(); - readEnv.getConfig().setRestartStrategy(RestartStrategies.noRestart()); - readEnv.getConfig().disableSysoutLogging(); - readEnv.setParallelism(parallelism); + if (validateSequence(topicName, parallelism, deserSchema, numElements)) { + // everything is good! + return topicName; + } + else { + deleteTestTopic(topicName); + // fall through the loop + } + } - Properties readProps = (Properties) standardProps.clone(); - readProps.setProperty("group.id", "flink-tests-validator"); - readProps.putAll(secureProps); - FlinkKafkaConsumerBase> consumer = kafkaServer.getConsumer(topicName, deserSchema, readProps); + throw new Exception("Could not write a valid sequence to Kafka after " + maxNumAttempts + " attempts"); + } - readEnv - .addSource(consumer) - .map(new RichMapFunction , Tuple2 >() { + protected void writeAppendSequence( + String topicName, + final int originalNumElements, + final int numElementsToAppend, + final int parallelism) throws Exception { - private final int totalCount = parallelism * numElements; - private int count = 0; + LOG.info("\n===\n" + + "== Appending sequence of " + numElementsToAppend + " into " + topicName + + "==="); - @Override - public Tuple2 map(Tuple2 value) throws Exception { - if (++count == totalCount) { - throw new SuccessException(); - } else { - return value; - } - } - }).setParallelism(1) - .addSink(new DiscardingSink >()).setParallelism(1); + final TypeInformation > resultType = + TypeInformation.of(new TypeHint >() {}); - final AtomicReference errorRef = new AtomicReference<>(); + final KeyedSerializationSchema > serSchema = + new KeyedSerializationSchemaWrapper<>( + new TypeInformationSerializationSchema<>(resultType, new ExecutionConfig())); - Thread runner = new Thread() { - @Override - public void run() { - try { - tryExecute(readEnv, "sequence validation"); - } catch (Throwable t) { - errorRef.set(t); - } + final KeyedDeserializationSchema > deserSchema = + new KeyedDeserializationSchemaWrapper<>( + new TypeInformationSerializationSchema<>(resultType, new ExecutionConfig())); + + // Write the append sequence
[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset
[ https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16364022#comment-16364022 ] ASF GitHub Bot commented on FLINK-6352: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5282#discussion_r167941085 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java --- @@ -343,6 +348,39 @@ public FlinkKafkaConsumerBase( */ public FlinkKafkaConsumerBase setStartFromLatest() { this.startupMode = StartupMode.LATEST; + this.startupOffsetsTimestamp = null; + this.specificStartupOffsets = null; + return this; + } + + /** +* Specifies the consumer to start reading partitions from a specified timestamp. +* The specified timestamp must be before the current timestamp. +* This lets the consumer ignore any committed group offsets in Zookeeper / Kafka brokers. +* +* The consumer will look up the earliest offset whose timestamp is greater than or equal +* to the specific timestamp from Kafka. If there's no such offset, the consumer will use the +* latest offset to read data from kafka. +* +* This method does not effect where partitions are read from when the consumer is restored --- End diff -- typo: effect -> affect > FlinkKafkaConsumer should support to use timestamp to set up start offset > - > > Key: FLINK-6352 > URL: https://issues.apache.org/jira/browse/FLINK-6352 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Fang Yong >Assignee: Fang Yong >Priority: Blocker > Fix For: 1.5.0 > > > Currently "auto.offset.reset" is used to initialize the start offset of > FlinkKafkaConsumer, and the value should be earliest/latest/none. This method > can only let the job comsume the beginning or the most recent data, but can > not specify the specific offset of Kafka began to consume. > So, there should be a configuration item (such as > "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that > allows user to configure the initial offset of Kafka. The action of > "flink.source.start.time" is as follows: > 1) job start from checkpoint / savepoint > a> offset of partition can be restored from checkpoint/savepoint, > "flink.source.start.time" will be ignored. > b> there's no checkpoint/savepoint for the partition (For example, this > partition is newly increased), the "flink.kafka.start.time" will be used to > initialize the offset of the partition > 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used > to initialize the offset of the kafka > a> the "flink.source.start.time" is valid, use it to set the offset of kafka > b> the "flink.source.start.time" is out-of-range, the same as it does > currently with no initial offset, get kafka's current offset and start reading -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset
[ https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16364025#comment-16364025 ] ASF GitHub Bot commented on FLINK-6352: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5282#discussion_r168169777 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java --- @@ -441,28 +481,57 @@ public void open(Configuration configuration) throws Exception { getRuntimeContext().getIndexOfThisSubtask(), subscribedPartitionsToStartOffsets.size(), subscribedPartitionsToStartOffsets); } else { // use the partition discoverer to fetch the initial seed partitions, - // and set their initial offsets depending on the startup mode - for (KafkaTopicPartition seedPartition : allPartitions) { - if (startupMode != StartupMode.SPECIFIC_OFFSETS) { - subscribedPartitionsToStartOffsets.put(seedPartition, startupMode.getStateSentinel()); - } else { + // and set their initial offsets depending on the startup mode. + // for SPECIFIC_OFFSETS and TIMESTAMP modes, we set the specific offsets now; + // for other modes (EARLIEST, LATEST, and GROUP_OFFSETS), the offset is lazily determined + // when the partition is actually read. + switch (startupMode) { + case SPECIFIC_OFFSETS: if (specificStartupOffsets == null) { throw new IllegalArgumentException( "Startup mode for the consumer set to " + StartupMode.SPECIFIC_OFFSETS + - ", but no specific offsets were specified"); + ", but no specific offsets were specified."); } - Long specificOffset = specificStartupOffsets.get(seedPartition); - if (specificOffset != null) { - // since the specified offsets represent the next record to read, we subtract - // it by one so that the initial state of the consumer will be correct - subscribedPartitionsToStartOffsets.put(seedPartition, specificOffset - 1); - } else { - // default to group offset behaviour if the user-provided specific offsets - // do not contain a value for this partition - subscribedPartitionsToStartOffsets.put(seedPartition, KafkaTopicPartitionStateSentinel.GROUP_OFFSET); + for (KafkaTopicPartition seedPartition : allPartitions) { + Long specificOffset = specificStartupOffsets.get(seedPartition); + if (specificOffset != null) { + // since the specified offsets represent the next record to read, we subtract + // it by one so that the initial state of the consumer will be correct + subscribedPartitionsToStartOffsets.put(seedPartition, specificOffset - 1); + } else { + // default to group offset behaviour if the user-provided specific offsets + // do not contain a value for this partition + subscribedPartitionsToStartOffsets.put(seedPartition, KafkaTopicPartitionStateSentinel.GROUP_OFFSET); + } + } + + break; + case TIMESTAMP: + if (startupOffsetsTimestamp == null) { + throw new IllegalArgumentException( --- End diff -- Maybe this should be an `IllegalStateException`. The existing code also uses `IllegalArgumentException` but were quite a bit removed from the actual point
[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset
[ https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16363556#comment-16363556 ] ASF GitHub Bot commented on FLINK-6352: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5282 @aljoscha this is the currently the case for any startup mode. Any partition discovered after the initial batch fetch is considered a new partition due to Kafka scale outs, and is therefore read from the record horizon. Startup modes apply only to existing partitions. > FlinkKafkaConsumer should support to use timestamp to set up start offset > - > > Key: FLINK-6352 > URL: https://issues.apache.org/jira/browse/FLINK-6352 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Fang Yong >Assignee: Fang Yong >Priority: Blocker > Fix For: 1.5.0 > > > Currently "auto.offset.reset" is used to initialize the start offset of > FlinkKafkaConsumer, and the value should be earliest/latest/none. This method > can only let the job comsume the beginning or the most recent data, but can > not specify the specific offset of Kafka began to consume. > So, there should be a configuration item (such as > "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that > allows user to configure the initial offset of Kafka. The action of > "flink.source.start.time" is as follows: > 1) job start from checkpoint / savepoint > a> offset of partition can be restored from checkpoint/savepoint, > "flink.source.start.time" will be ignored. > b> there's no checkpoint/savepoint for the partition (For example, this > partition is newly increased), the "flink.kafka.start.time" will be used to > initialize the offset of the partition > 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used > to initialize the offset of the kafka > a> the "flink.source.start.time" is valid, use it to set the offset of kafka > b> the "flink.source.start.time" is out-of-range, the same as it does > currently with no initial offset, get kafka's current offset and start reading -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset
[ https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16362697#comment-16362697 ] ASF GitHub Bot commented on FLINK-6352: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/5282 Initial question: why is the timestamp not used for newly discovered partitions? > FlinkKafkaConsumer should support to use timestamp to set up start offset > - > > Key: FLINK-6352 > URL: https://issues.apache.org/jira/browse/FLINK-6352 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Fang Yong >Assignee: Fang Yong >Priority: Blocker > Fix For: 1.5.0 > > > Currently "auto.offset.reset" is used to initialize the start offset of > FlinkKafkaConsumer, and the value should be earliest/latest/none. This method > can only let the job comsume the beginning or the most recent data, but can > not specify the specific offset of Kafka began to consume. > So, there should be a configuration item (such as > "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that > allows user to configure the initial offset of Kafka. The action of > "flink.source.start.time" is as follows: > 1) job start from checkpoint / savepoint > a> offset of partition can be restored from checkpoint/savepoint, > "flink.source.start.time" will be ignored. > b> there's no checkpoint/savepoint for the partition (For example, this > partition is newly increased), the "flink.kafka.start.time" will be used to > initialize the offset of the partition > 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used > to initialize the offset of the kafka > a> the "flink.source.start.time" is valid, use it to set the offset of kafka > b> the "flink.source.start.time" is out-of-range, the same as it does > currently with no initial offset, get kafka's current offset and start reading -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset
[ https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16322119#comment-16322119 ] ASF GitHub Bot commented on FLINK-6352: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3915 Hi @zjureel, I went ahead to address my own review / concerns with the change in another PR that is based on your current work: #5282. I hope that is okay, and would be great if you would like to review that. The main changes are: - We eagerly determine the timestamp offsets. `LATEST`, `EARLIEST`, `GROUP_OFFSETS` startup modes still determines the offsets lazily, while `TIMESTAMP` and `SPECIFIC_OFFSETS` will have actual offsets already before they handled by the `KafkaConsumerThread`. It dawned on me that actually there is no reason to lazily determine the offset for timestamp-based startup, since the actual offset in the end in this case does not vary depending on when we fetch the startup offsets. - Don't use `Date` to define timestamp, just use Longs. The Kafka APIs actually take long value timestamps, so I figured it would make sense that we follow. > FlinkKafkaConsumer should support to use timestamp to set up start offset > - > > Key: FLINK-6352 > URL: https://issues.apache.org/jira/browse/FLINK-6352 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Fang Yong >Assignee: Fang Yong >Priority: Blocker > Fix For: 1.5.0 > > > Currently "auto.offset.reset" is used to initialize the start offset of > FlinkKafkaConsumer, and the value should be earliest/latest/none. This method > can only let the job comsume the beginning or the most recent data, but can > not specify the specific offset of Kafka began to consume. > So, there should be a configuration item (such as > "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that > allows user to configure the initial offset of Kafka. The action of > "flink.source.start.time" is as follows: > 1) job start from checkpoint / savepoint > a> offset of partition can be restored from checkpoint/savepoint, > "flink.source.start.time" will be ignored. > b> there's no checkpoint/savepoint for the partition (For example, this > partition is newly increased), the "flink.kafka.start.time" will be used to > initialize the offset of the partition > 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used > to initialize the offset of the kafka > a> the "flink.source.start.time" is valid, use it to set the offset of kafka > b> the "flink.source.start.time" is out-of-range, the same as it does > currently with no initial offset, get kafka's current offset and start reading -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset
[ https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16322109#comment-16322109 ] ASF GitHub Bot commented on FLINK-6352: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5282 cc @zjureel > FlinkKafkaConsumer should support to use timestamp to set up start offset > - > > Key: FLINK-6352 > URL: https://issues.apache.org/jira/browse/FLINK-6352 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Fang Yong >Assignee: Fang Yong >Priority: Blocker > Fix For: 1.5.0 > > > Currently "auto.offset.reset" is used to initialize the start offset of > FlinkKafkaConsumer, and the value should be earliest/latest/none. This method > can only let the job comsume the beginning or the most recent data, but can > not specify the specific offset of Kafka began to consume. > So, there should be a configuration item (such as > "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that > allows user to configure the initial offset of Kafka. The action of > "flink.source.start.time" is as follows: > 1) job start from checkpoint / savepoint > a> offset of partition can be restored from checkpoint/savepoint, > "flink.source.start.time" will be ignored. > b> there's no checkpoint/savepoint for the partition (For example, this > partition is newly increased), the "flink.kafka.start.time" will be used to > initialize the offset of the partition > 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used > to initialize the offset of the kafka > a> the "flink.source.start.time" is valid, use it to set the offset of kafka > b> the "flink.source.start.time" is out-of-range, the same as it does > currently with no initial offset, get kafka's current offset and start reading -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset
[ https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16322108#comment-16322108 ] ASF GitHub Bot commented on FLINK-6352: --- GitHub user tzulitai opened a pull request: https://github.com/apache/flink/pull/5282 [FLINK-6352] [kafka] Timestamp-based offset configuration for FlinkKafkaConsumer ## What is the purpose of the change This PR is based on @zjureel's initial efforts on the feature in #3915. This version mainly differs in that: - When using timestamps to define the offset, the actual offset is eagerly determined in the `FlinkKafkaConsumerBase` class. - The `setStartFromTimestamp` configuration method is defined in the `FlinkKafkaConsumerBase` class, with `protected` access. Kafka versions which support the functionality should override the method with `public` access. - Timestamp is configured simply as a long value, and not a Java `Date`. **Overall, the usage of the feature is as follows:** ``` FlinkKafkaConsumer011 consumer = new FlinkKafkaConsumer011<>(...); consumer.setStartFromTimestamp(1515671654453L); DataStream stream = env.addSource(consumer); ... ``` Only versions 0.10 and 0.11 supports this feature. **Semantics:** - The provided timestamp cannot be larger than the current timestamp. - For a partition, the earliest record which `record timestamp >= provided timestamp` is used as the starting offset. - If the provided timestamp is larger than the latest record in a partition, that partition will simply be read from the head. - For all new partitions that are discovered after the initial startup (due to scaling out Kafka), they are all read from the earliest possible record and the provided timestamp is not used. ## Brief change log - d012826 @zjureel's initial efforts on the feature. - 7ac07e8 Instead of lazily determining exact offsets for timestamp-based startup, the offsets are determined eagerly in `FlinkKafkaConsumerBase`. This commit also refactors the `setStartFromTimestamp` method to live in the base class. - 32d46ef Change to just use long values to define timestamps, instead of using Java `Date` - 7bb44a8 General improvement for the `runStartFromTimestamp` integration test. ## Verifying this change New integration tests `Kafka010ITCase::testStartFromTimestamp` and `Kafka011ITCase::testStartFromTimestamp` verifies this new feature. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (**yes** / no) - If yes, how is the feature documented? (not applicable / **docs** / **JavaDocs** / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tzulitai/flink FLINK-6352 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5282.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5282 commit d012826480b7eee2641da3b260b127bf8efaf790 Author: zjureelDate: 2017-12-21T09:49:11Z [FLINK-6352] [kafka] Support to set offset of Kafka with specific date commit 7ac07e8824ec42aeef6ee6b1d00650acf8ae06bb Author: Tzu-Li (Gordon) Tai Date: 2018-01-11T06:26:37Z [FLINK-6352] [kafka] Eagerly determine startup offsets when startup mode is TIMESTAMP commit 32d46ef2b98b282ca12e170702161bc123bc1f56 Author: Tzu-Li (Gordon) Tai Date: 2018-01-11T09:33:49Z [FLINK-6352] [kafka] Remove usage of java Date to specify startup timestamp commit 7bb44a8d510612bff4b5137ff54f023ed556489a Author: Tzu-Li (Gordon) Tai Date: 2018-01-11T10:33:21Z [FLINK-6352] [kafka, tests] Make runStartFromTimestamp more flexible > FlinkKafkaConsumer should support to use timestamp to set up start offset > - > > Key: FLINK-6352 > URL: https://issues.apache.org/jira/browse/FLINK-6352 > Project: Flink > Issue Type: Improvement > Components: Kafka
[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset
[ https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16320254#comment-16320254 ] ASF GitHub Bot commented on FLINK-6352: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3915#discussion_r160677582 --- Diff: flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge010.java --- @@ -48,4 +59,18 @@ public void seekPartitionToBeginning(KafkaConsumer consumer, TopicPartitio public void seekPartitionToEnd(KafkaConsumer consumer, TopicPartition partition) { consumer.seekToEnd(Collections.singletonList(partition)); } + + @Override + public void seekPartitionToDate(KafkaConsumer consumer, TopicPartition partition) { --- End diff -- But from here I can understand why. Ideally, this method signature should really be `seekPartitionToDate(KafkaConsumer, TopicParitition, Date)`, but that would require the startup date to be passed all the way to the `KafkaConsumerThread`. This also leads to the fact, which isn't nice, that the `KafkaConsumerThread` lives within the Kafka 0.9 module, while 0.9 doesn't support timestamp-based offsets ... > FlinkKafkaConsumer should support to use timestamp to set up start offset > - > > Key: FLINK-6352 > URL: https://issues.apache.org/jira/browse/FLINK-6352 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Fang Yong >Assignee: Fang Yong >Priority: Blocker > Fix For: 1.5.0 > > > Currently "auto.offset.reset" is used to initialize the start offset of > FlinkKafkaConsumer, and the value should be earliest/latest/none. This method > can only let the job comsume the beginning or the most recent data, but can > not specify the specific offset of Kafka began to consume. > So, there should be a configuration item (such as > "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that > allows user to configure the initial offset of Kafka. The action of > "flink.source.start.time" is as follows: > 1) job start from checkpoint / savepoint > a> offset of partition can be restored from checkpoint/savepoint, > "flink.source.start.time" will be ignored. > b> there's no checkpoint/savepoint for the partition (For example, this > partition is newly increased), the "flink.kafka.start.time" will be used to > initialize the offset of the partition > 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used > to initialize the offset of the kafka > a> the "flink.source.start.time" is valid, use it to set the offset of kafka > b> the "flink.source.start.time" is out-of-range, the same as it does > currently with no initial offset, get kafka's current offset and start reading -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset
[ https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16320255#comment-16320255 ] ASF GitHub Bot commented on FLINK-6352: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3915#discussion_r160676392 --- Diff: flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerCallBridge010.java --- @@ -34,6 +38,13 @@ */ public class KafkaConsumerCallBridge010 extends KafkaConsumerCallBridge { + private Date startupDate; --- End diff -- Passing in the startup date to the API call bridge constructor seems to be very confusing ... > FlinkKafkaConsumer should support to use timestamp to set up start offset > - > > Key: FLINK-6352 > URL: https://issues.apache.org/jira/browse/FLINK-6352 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Fang Yong >Assignee: Fang Yong >Priority: Blocker > Fix For: 1.5.0 > > > Currently "auto.offset.reset" is used to initialize the start offset of > FlinkKafkaConsumer, and the value should be earliest/latest/none. This method > can only let the job comsume the beginning or the most recent data, but can > not specify the specific offset of Kafka began to consume. > So, there should be a configuration item (such as > "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that > allows user to configure the initial offset of Kafka. The action of > "flink.source.start.time" is as follows: > 1) job start from checkpoint / savepoint > a> offset of partition can be restored from checkpoint/savepoint, > "flink.source.start.time" will be ignored. > b> there's no checkpoint/savepoint for the partition (For example, this > partition is newly increased), the "flink.kafka.start.time" will be used to > initialize the offset of the partition > 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used > to initialize the offset of the kafka > a> the "flink.source.start.time" is valid, use it to set the offset of kafka > b> the "flink.source.start.time" is out-of-range, the same as it does > currently with no initial offset, get kafka's current offset and start reading -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset
[ https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16299782#comment-16299782 ] ASF GitHub Bot commented on FLINK-6352: --- Github user zjureel commented on the issue: https://github.com/apache/flink/pull/3915 @tzulitai I notice that there's some update of this issue, so rebase master to this PR, could you please take a look when you're free, thanks > FlinkKafkaConsumer should support to use timestamp to set up start offset > - > > Key: FLINK-6352 > URL: https://issues.apache.org/jira/browse/FLINK-6352 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Fang Yong >Assignee: Fang Yong >Priority: Blocker > Fix For: 1.5.0 > > > Currently "auto.offset.reset" is used to initialize the start offset of > FlinkKafkaConsumer, and the value should be earliest/latest/none. This method > can only let the job comsume the beginning or the most recent data, but can > not specify the specific offset of Kafka began to consume. > So, there should be a configuration item (such as > "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that > allows user to configure the initial offset of Kafka. The action of > "flink.source.start.time" is as follows: > 1) job start from checkpoint / savepoint > a> offset of partition can be restored from checkpoint/savepoint, > "flink.source.start.time" will be ignored. > b> there's no checkpoint/savepoint for the partition (For example, this > partition is newly increased), the "flink.kafka.start.time" will be used to > initialize the offset of the partition > 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used > to initialize the offset of the kafka > a> the "flink.source.start.time" is valid, use it to set the offset of kafka > b> the "flink.source.start.time" is out-of-range, the same as it does > currently with no initial offset, get kafka's current offset and start reading -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset
[ https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16286936#comment-16286936 ] Tzu-Li (Gordon) Tai commented on FLINK-6352: We seem to have a user asking for this feature: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Kafka-connector-partition-offsets-for-a-given-timestamp-td17187.html. Elevating this issue to BLOCKER for 1.5.0. > FlinkKafkaConsumer should support to use timestamp to set up start offset > - > > Key: FLINK-6352 > URL: https://issues.apache.org/jira/browse/FLINK-6352 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Fang Yong >Assignee: Fang Yong > Fix For: 1.5.0 > > > Currently "auto.offset.reset" is used to initialize the start offset of > FlinkKafkaConsumer, and the value should be earliest/latest/none. This method > can only let the job comsume the beginning or the most recent data, but can > not specify the specific offset of Kafka began to consume. > So, there should be a configuration item (such as > "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that > allows user to configure the initial offset of Kafka. The action of > "flink.source.start.time" is as follows: > 1) job start from checkpoint / savepoint > a> offset of partition can be restored from checkpoint/savepoint, > "flink.source.start.time" will be ignored. > b> there's no checkpoint/savepoint for the partition (For example, this > partition is newly increased), the "flink.kafka.start.time" will be used to > initialize the offset of the partition > 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used > to initialize the offset of the kafka > a> the "flink.source.start.time" is valid, use it to set the offset of kafka > b> the "flink.source.start.time" is out-of-range, the same as it does > currently with no initial offset, get kafka's current offset and start reading -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset
[ https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16092749#comment-16092749 ] ASF GitHub Bot commented on FLINK-6352: --- Github user zjureel commented on the issue: https://github.com/apache/flink/pull/3915 @tzulitai No problem, thank you for your attension :) > FlinkKafkaConsumer should support to use timestamp to set up start offset > - > > Key: FLINK-6352 > URL: https://issues.apache.org/jira/browse/FLINK-6352 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Fang Yong >Assignee: Fang Yong > Fix For: 1.4.0 > > > Currently "auto.offset.reset" is used to initialize the start offset of > FlinkKafkaConsumer, and the value should be earliest/latest/none. This method > can only let the job comsume the beginning or the most recent data, but can > not specify the specific offset of Kafka began to consume. > So, there should be a configuration item (such as > "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that > allows user to configure the initial offset of Kafka. The action of > "flink.source.start.time" is as follows: > 1) job start from checkpoint / savepoint > a> offset of partition can be restored from checkpoint/savepoint, > "flink.source.start.time" will be ignored. > b> there's no checkpoint/savepoint for the partition (For example, this > partition is newly increased), the "flink.kafka.start.time" will be used to > initialize the offset of the partition > 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used > to initialize the offset of the kafka > a> the "flink.source.start.time" is valid, use it to set the offset of kafka > b> the "flink.source.start.time" is out-of-range, the same as it does > currently with no initial offset, get kafka's current offset and start reading -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset
[ https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16092716#comment-16092716 ] ASF GitHub Bot commented on FLINK-6352: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3915 @zjureel just so you know, I'm currently a bit busy with other critical bugs in the Kafka consumer. Please bear with me a little bit more .. > FlinkKafkaConsumer should support to use timestamp to set up start offset > - > > Key: FLINK-6352 > URL: https://issues.apache.org/jira/browse/FLINK-6352 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Fang Yong >Assignee: Fang Yong > Fix For: 1.4.0 > > > Currently "auto.offset.reset" is used to initialize the start offset of > FlinkKafkaConsumer, and the value should be earliest/latest/none. This method > can only let the job comsume the beginning or the most recent data, but can > not specify the specific offset of Kafka began to consume. > So, there should be a configuration item (such as > "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that > allows user to configure the initial offset of Kafka. The action of > "flink.source.start.time" is as follows: > 1) job start from checkpoint / savepoint > a> offset of partition can be restored from checkpoint/savepoint, > "flink.source.start.time" will be ignored. > b> there's no checkpoint/savepoint for the partition (For example, this > partition is newly increased), the "flink.kafka.start.time" will be used to > initialize the offset of the partition > 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used > to initialize the offset of the kafka > a> the "flink.source.start.time" is valid, use it to set the offset of kafka > b> the "flink.source.start.time" is out-of-range, the same as it does > currently with no initial offset, get kafka's current offset and start reading -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset
[ https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16069846#comment-16069846 ] ASF GitHub Bot commented on FLINK-6352: --- Github user zjureel commented on the issue: https://github.com/apache/flink/pull/3915 @tzulitai Thank you for your suggestion. I think move the conversion between date and offset to `KafkaConsumerThread` is really a good idea. I have fixed the NPE in test case, and move the conversion of date to offset, please have a look when you rre free, thanks > FlinkKafkaConsumer should support to use timestamp to set up start offset > - > > Key: FLINK-6352 > URL: https://issues.apache.org/jira/browse/FLINK-6352 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Fang Yong >Assignee: Fang Yong > Fix For: 1.4.0 > > > Currently "auto.offset.reset" is used to initialize the start offset of > FlinkKafkaConsumer, and the value should be earliest/latest/none. This method > can only let the job comsume the beginning or the most recent data, but can > not specify the specific offset of Kafka began to consume. > So, there should be a configuration item (such as > "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that > allows user to configure the initial offset of Kafka. The action of > "flink.source.start.time" is as follows: > 1) job start from checkpoint / savepoint > a> offset of partition can be restored from checkpoint/savepoint, > "flink.source.start.time" will be ignored. > b> there's no checkpoint/savepoint for the partition (For example, this > partition is newly increased), the "flink.kafka.start.time" will be used to > initialize the offset of the partition > 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used > to initialize the offset of the kafka > a> the "flink.source.start.time" is valid, use it to set the offset of kafka > b> the "flink.source.start.time" is out-of-range, the same as it does > currently with no initial offset, get kafka's current offset and start reading -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset
[ https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16069634#comment-16069634 ] ASF GitHub Bot commented on FLINK-6352: --- Github user zjureel commented on a diff in the pull request: https://github.com/apache/flink/pull/3915#discussion_r124977924 --- Diff: flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java --- @@ -128,6 +135,53 @@ public FlinkKafkaConsumer010(List topics, KeyedDeserializationSchema } @Override + public FlinkKafkaConsumerBase setStartFromSpecificDate(Date date) { --- End diff -- In fact we need to override this in 0.10 here. `FlinkKafkaConsumer010` extends from `FlinkKafkaConsumer09`, and `Exception` will be thrown in `setStartFromSpecificDate` of `FlinkKafkaConsumer09` > FlinkKafkaConsumer should support to use timestamp to set up start offset > - > > Key: FLINK-6352 > URL: https://issues.apache.org/jira/browse/FLINK-6352 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Fang Yong >Assignee: Fang Yong > Fix For: 1.4.0 > > > Currently "auto.offset.reset" is used to initialize the start offset of > FlinkKafkaConsumer, and the value should be earliest/latest/none. This method > can only let the job comsume the beginning or the most recent data, but can > not specify the specific offset of Kafka began to consume. > So, there should be a configuration item (such as > "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that > allows user to configure the initial offset of Kafka. The action of > "flink.source.start.time" is as follows: > 1) job start from checkpoint / savepoint > a> offset of partition can be restored from checkpoint/savepoint, > "flink.source.start.time" will be ignored. > b> there's no checkpoint/savepoint for the partition (For example, this > partition is newly increased), the "flink.kafka.start.time" will be used to > initialize the offset of the partition > 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used > to initialize the offset of the kafka > a> the "flink.source.start.time" is valid, use it to set the offset of kafka > b> the "flink.source.start.time" is out-of-range, the same as it does > currently with no initial offset, get kafka's current offset and start reading -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset
[ https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16069507#comment-16069507 ] ASF GitHub Bot commented on FLINK-6352: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3915#discussion_r124965318 --- Diff: flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java --- @@ -128,6 +135,53 @@ public FlinkKafkaConsumer010(List topics, KeyedDeserializationSchema } @Override + public FlinkKafkaConsumerBase setStartFromSpecificDate(Date date) { --- End diff -- I don't think you need to override this in 0.10, right? The implementation is basically identical to the base implementation. > FlinkKafkaConsumer should support to use timestamp to set up start offset > - > > Key: FLINK-6352 > URL: https://issues.apache.org/jira/browse/FLINK-6352 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Fang Yong >Assignee: Fang Yong > Fix For: 1.4.0 > > > Currently "auto.offset.reset" is used to initialize the start offset of > FlinkKafkaConsumer, and the value should be earliest/latest/none. This method > can only let the job comsume the beginning or the most recent data, but can > not specify the specific offset of Kafka began to consume. > So, there should be a configuration item (such as > "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that > allows user to configure the initial offset of Kafka. The action of > "flink.source.start.time" is as follows: > 1) job start from checkpoint / savepoint > a> offset of partition can be restored from checkpoint/savepoint, > "flink.source.start.time" will be ignored. > b> there's no checkpoint/savepoint for the partition (For example, this > partition is newly increased), the "flink.kafka.start.time" will be used to > initialize the offset of the partition > 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used > to initialize the offset of the kafka > a> the "flink.source.start.time" is valid, use it to set the offset of kafka > b> the "flink.source.start.time" is out-of-range, the same as it does > currently with no initial offset, get kafka's current offset and start reading -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset
[ https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16069510#comment-16069510 ] ASF GitHub Bot commented on FLINK-6352: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3915#discussion_r124964762 --- Diff: flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java --- @@ -128,6 +135,53 @@ public FlinkKafkaConsumer010(List topics, KeyedDeserializationSchema } @Override + public FlinkKafkaConsumerBase setStartFromSpecificDate(Date date) { + Preconditions.checkArgument(null != date && date.getTime() <= System.currentTimeMillis(), "Startup time must before curr time."); + this.startupMode = StartupMode.SPECIFIC_TIMESTAMP; + this.specificStartupDate = date; + this.specificStartupOffsets = null; + return this; + } + + /** +* Convert flink topic partition to kafka topic partition. +* @param flinkTopicPartitionMap +* @return +*/ + private MapconvertFlinkToKafkaTopicPartition(Map flinkTopicPartitionMap) { + Map topicPartitionMap = new HashMap<>(flinkTopicPartitionMap.size()); + for (Map.Entry entry : flinkTopicPartitionMap.entrySet()) { + topicPartitionMap.put(new TopicPartition(entry.getKey().getTopic(), entry.getKey().getPartition()), entry.getValue()); + } + + return topicPartitionMap; + --- End diff -- unnecessary empty line > FlinkKafkaConsumer should support to use timestamp to set up start offset > - > > Key: FLINK-6352 > URL: https://issues.apache.org/jira/browse/FLINK-6352 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Fang Yong >Assignee: Fang Yong > Fix For: 1.4.0 > > > Currently "auto.offset.reset" is used to initialize the start offset of > FlinkKafkaConsumer, and the value should be earliest/latest/none. This method > can only let the job comsume the beginning or the most recent data, but can > not specify the specific offset of Kafka began to consume. > So, there should be a configuration item (such as > "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that > allows user to configure the initial offset of Kafka. The action of > "flink.source.start.time" is as follows: > 1) job start from checkpoint / savepoint > a> offset of partition can be restored from checkpoint/savepoint, > "flink.source.start.time" will be ignored. > b> there's no checkpoint/savepoint for the partition (For example, this > partition is newly increased), the "flink.kafka.start.time" will be used to > initialize the offset of the partition > 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used > to initialize the offset of the kafka > a> the "flink.source.start.time" is valid, use it to set the offset of kafka > b> the "flink.source.start.time" is out-of-range, the same as it does > currently with no initial offset, get kafka's current offset and start reading -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset
[ https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16069512#comment-16069512 ] ASF GitHub Bot commented on FLINK-6352: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3915#discussion_r124966020 --- Diff: flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java --- @@ -128,6 +135,53 @@ public FlinkKafkaConsumer010(List topics, KeyedDeserializationSchema } @Override + public FlinkKafkaConsumerBase setStartFromSpecificDate(Date date) { + Preconditions.checkArgument(null != date && date.getTime() <= System.currentTimeMillis(), "Startup time must before curr time."); + this.startupMode = StartupMode.SPECIFIC_TIMESTAMP; + this.specificStartupDate = date; + this.specificStartupOffsets = null; + return this; + } + + /** +* Convert flink topic partition to kafka topic partition. +* @param flinkTopicPartitionMap +* @return +*/ + private MapconvertFlinkToKafkaTopicPartition(Map flinkTopicPartitionMap) { + Map topicPartitionMap = new HashMap<>(flinkTopicPartitionMap.size()); + for (Map.Entry entry : flinkTopicPartitionMap.entrySet()) { + topicPartitionMap.put(new TopicPartition(entry.getKey().getTopic(), entry.getKey().getPartition()), entry.getValue()); + } + + return topicPartitionMap; + + } + + /** +* Search offset from timestamp for each topic in kafka. If no offset exist, use the latest offset. +* @param partitionTimesMap Kafka topic partition and timestamp +* @return Kafka topic partition and the earliest offset after the timestamp. If no offset exist, use the latest offset in kafka +*/ + private Map convertTimestampToOffset(Map partitionTimesMap) { --- End diff -- Of course, this would entail that we need to encode the timestamp into a `KafkaTopicPartitionStateSentinel`. > FlinkKafkaConsumer should support to use timestamp to set up start offset > - > > Key: FLINK-6352 > URL: https://issues.apache.org/jira/browse/FLINK-6352 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Fang Yong >Assignee: Fang Yong > Fix For: 1.4.0 > > > Currently "auto.offset.reset" is used to initialize the start offset of > FlinkKafkaConsumer, and the value should be earliest/latest/none. This method > can only let the job comsume the beginning or the most recent data, but can > not specify the specific offset of Kafka began to consume. > So, there should be a configuration item (such as > "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that > allows user to configure the initial offset of Kafka. The action of > "flink.source.start.time" is as follows: > 1) job start from checkpoint / savepoint > a> offset of partition can be restored from checkpoint/savepoint, > "flink.source.start.time" will be ignored. > b> there's no checkpoint/savepoint for the partition (For example, this > partition is newly increased), the "flink.kafka.start.time" will be used to > initialize the offset of the partition > 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used > to initialize the offset of the kafka > a> the "flink.source.start.time" is valid, use it to set the offset of kafka > b> the "flink.source.start.time" is out-of-range, the same as it does > currently with no initial offset, get kafka's current offset and start reading -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset
[ https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16069514#comment-16069514 ] ASF GitHub Bot commented on FLINK-6352: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3915#discussion_r124964821 --- Diff: flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java --- @@ -128,6 +135,53 @@ public FlinkKafkaConsumer010(List topics, KeyedDeserializationSchema } @Override + public FlinkKafkaConsumerBase setStartFromSpecificDate(Date date) { + Preconditions.checkArgument(null != date && date.getTime() <= System.currentTimeMillis(), "Startup time must before curr time."); + this.startupMode = StartupMode.SPECIFIC_TIMESTAMP; + this.specificStartupDate = date; + this.specificStartupOffsets = null; + return this; + } + + /** +* Convert flink topic partition to kafka topic partition. +* @param flinkTopicPartitionMap +* @return +*/ + private MapconvertFlinkToKafkaTopicPartition(Map flinkTopicPartitionMap) { + Map topicPartitionMap = new HashMap<>(flinkTopicPartitionMap.size()); + for (Map.Entry entry : flinkTopicPartitionMap.entrySet()) { + topicPartitionMap.put(new TopicPartition(entry.getKey().getTopic(), entry.getKey().getPartition()), entry.getValue()); + } + + return topicPartitionMap; + + } + + /** +* Search offset from timestamp for each topic in kafka. If no offset exist, use the latest offset. +* @param partitionTimesMap Kafka topic partition and timestamp +* @return Kafka topic partition and the earliest offset after the timestamp. If no offset exist, use the latest offset in kafka +*/ + private Map convertTimestampToOffset(Map partitionTimesMap) { --- End diff -- Could you move these private aux methods to the end of the class? That would benefit the readability / flow of the code. > FlinkKafkaConsumer should support to use timestamp to set up start offset > - > > Key: FLINK-6352 > URL: https://issues.apache.org/jira/browse/FLINK-6352 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Fang Yong >Assignee: Fang Yong > Fix For: 1.4.0 > > > Currently "auto.offset.reset" is used to initialize the start offset of > FlinkKafkaConsumer, and the value should be earliest/latest/none. This method > can only let the job comsume the beginning or the most recent data, but can > not specify the specific offset of Kafka began to consume. > So, there should be a configuration item (such as > "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that > allows user to configure the initial offset of Kafka. The action of > "flink.source.start.time" is as follows: > 1) job start from checkpoint / savepoint > a> offset of partition can be restored from checkpoint/savepoint, > "flink.source.start.time" will be ignored. > b> there's no checkpoint/savepoint for the partition (For example, this > partition is newly increased), the "flink.kafka.start.time" will be used to > initialize the offset of the partition > 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used > to initialize the offset of the kafka > a> the "flink.source.start.time" is valid, use it to set the offset of kafka > b> the "flink.source.start.time" is out-of-range, the same as it does > currently with no initial offset, get kafka's current offset and start reading -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset
[ https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16069513#comment-16069513 ] ASF GitHub Bot commented on FLINK-6352: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3915#discussion_r124964813 --- Diff: flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java --- @@ -128,6 +135,53 @@ public FlinkKafkaConsumer010(List topics, KeyedDeserializationSchema } @Override + public FlinkKafkaConsumerBase setStartFromSpecificDate(Date date) { + Preconditions.checkArgument(null != date && date.getTime() <= System.currentTimeMillis(), "Startup time must before curr time."); + this.startupMode = StartupMode.SPECIFIC_TIMESTAMP; + this.specificStartupDate = date; + this.specificStartupOffsets = null; + return this; + } + + /** +* Convert flink topic partition to kafka topic partition. +* @param flinkTopicPartitionMap +* @return +*/ + private MapconvertFlinkToKafkaTopicPartition(Map flinkTopicPartitionMap) { --- End diff -- Could you move these private aux methods to the end of the class? That would benefit the readability / flow of the code. > FlinkKafkaConsumer should support to use timestamp to set up start offset > - > > Key: FLINK-6352 > URL: https://issues.apache.org/jira/browse/FLINK-6352 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Fang Yong >Assignee: Fang Yong > Fix For: 1.4.0 > > > Currently "auto.offset.reset" is used to initialize the start offset of > FlinkKafkaConsumer, and the value should be earliest/latest/none. This method > can only let the job comsume the beginning or the most recent data, but can > not specify the specific offset of Kafka began to consume. > So, there should be a configuration item (such as > "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that > allows user to configure the initial offset of Kafka. The action of > "flink.source.start.time" is as follows: > 1) job start from checkpoint / savepoint > a> offset of partition can be restored from checkpoint/savepoint, > "flink.source.start.time" will be ignored. > b> there's no checkpoint/savepoint for the partition (For example, this > partition is newly increased), the "flink.kafka.start.time" will be used to > initialize the offset of the partition > 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used > to initialize the offset of the kafka > a> the "flink.source.start.time" is valid, use it to set the offset of kafka > b> the "flink.source.start.time" is out-of-range, the same as it does > currently with no initial offset, get kafka's current offset and start reading -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset
[ https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16069509#comment-16069509 ] ASF GitHub Bot commented on FLINK-6352: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3915#discussion_r124965642 --- Diff: flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java --- @@ -128,6 +135,53 @@ public FlinkKafkaConsumer010(List topics, KeyedDeserializationSchema } @Override + public FlinkKafkaConsumerBase setStartFromSpecificDate(Date date) { + Preconditions.checkArgument(null != date && date.getTime() <= System.currentTimeMillis(), "Startup time must before curr time."); + this.startupMode = StartupMode.SPECIFIC_TIMESTAMP; + this.specificStartupDate = date; + this.specificStartupOffsets = null; + return this; + } + + /** +* Convert flink topic partition to kafka topic partition. +* @param flinkTopicPartitionMap +* @return +*/ + private MapconvertFlinkToKafkaTopicPartition(Map flinkTopicPartitionMap) { + Map topicPartitionMap = new HashMap<>(flinkTopicPartitionMap.size()); + for (Map.Entry entry : flinkTopicPartitionMap.entrySet()) { + topicPartitionMap.put(new TopicPartition(entry.getKey().getTopic(), entry.getKey().getPartition()), entry.getValue()); + } + + return topicPartitionMap; + + } + + /** +* Search offset from timestamp for each topic in kafka. If no offset exist, use the latest offset. +* @param partitionTimesMap Kafka topic partition and timestamp +* @return Kafka topic partition and the earliest offset after the timestamp. If no offset exist, use the latest offset in kafka +*/ + private Map convertTimestampToOffset(Map partitionTimesMap) { --- End diff -- I think we need to move this conversion logic to `KafkaConsumerThread`, otherwise we would be instantiating a KafkaConsumer just for the sake of fetching timestamp-based offsets. That's where the actual "`KafkaTopicPartitionStateSentinel` to actual offset" conversions take place. See `KafkaConsumerThread` lines 369 - 390 ``` // offsets in the state of new partitions may still be placeholder sentinel values if we are: // (1) starting fresh, // (2) checkpoint / savepoint state we were restored with had not completely // been replaced with actual offset values yet, or // (3) the partition was newly discovered after startup; // replace those with actual offsets, according to what the sentinel value represent. for (KafkaTopicPartitionState newPartitionState : newPartitions) { if (newPartitionState.getOffset() == KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET) { consumerCallBridge.seekPartitionToBeginning(consumerTmp, newPartitionState.getKafkaPartitionHandle()); newPartitionState.setOffset(consumerTmp.position(newPartitionState.getKafkaPartitionHandle()) - 1); } else if (newPartitionState.getOffset() == KafkaTopicPartitionStateSentinel.LATEST_OFFSET) { consumerCallBridge.seekPartitionToEnd(consumerTmp, newPartitionState.getKafkaPartitionHandle()); newPartitionState.setOffset(consumerTmp.position(newPartitionState.getKafkaPartitionHandle()) - 1); } else if (newPartitionState.getOffset() == KafkaTopicPartitionStateSentinel.GROUP_OFFSET) { // the KafkaConsumer by default will automatically seek the consumer position // to the committed group offset, so we do not need to do it. newPartitionState.setOffset(consumerTmp.position(newPartitionState.getKafkaPartitionHandle()) - 1); } else { consumerTmp.seek(newPartitionState.getKafkaPartitionHandle(), newPartitionState.getOffset() + 1); } } ``` > FlinkKafkaConsumer should support to use timestamp to set up start offset > - > > Key: FLINK-6352 > URL: https://issues.apache.org/jira/browse/FLINK-6352 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Fang Yong >Assignee: Fang Yong > Fix For: 1.4.0 > > > Currently "auto.offset.reset" is used to initialize the start offset of > FlinkKafkaConsumer, and the value should be earliest/latest/none. This
[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset
[ https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16069511#comment-16069511 ] ASF GitHub Bot commented on FLINK-6352: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3915#discussion_r124966340 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java --- @@ -697,13 +738,19 @@ protected static void initializeSubscribedPartitionsToStartOffsets( int indexOfThisSubtask, int numParallelSubtasks, StartupMode startupMode, + Date specificStartupDate, MapspecificStartupOffsets) { for (int i = 0; i < kafkaTopicPartitions.size(); i++) { if (i % numParallelSubtasks == indexOfThisSubtask) { - if (startupMode != StartupMode.SPECIFIC_OFFSETS) { - subscribedPartitionsToStartOffsets.put(kafkaTopicPartitions.get(i), startupMode.getStateSentinel()); - } else { + if (startupMode == StartupMode.SPECIFIC_TIMESTAMP) { + if (specificStartupDate == null) { + throw new IllegalArgumentException( + "Startup mode for the consumer set to " + StartupMode.SPECIFIC_TIMESTAMP + + ", but no specific timestamp were specified"); + } + subscribedPartitionsToStartOffsets.put(kafkaTopicPartitions.get(i), specificStartupDate.getTime()); --- End diff -- This is the main problem: following the original design pattern, it would be better to place a `KafkaTopicPartitionStateSentinel` here instead of eagerly converting the `Date` to a specific offset. We only convert the date to specific offsets when we're about to start consuming the partition (i.e. in `KafkaConsumer` thread). > FlinkKafkaConsumer should support to use timestamp to set up start offset > - > > Key: FLINK-6352 > URL: https://issues.apache.org/jira/browse/FLINK-6352 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Fang Yong >Assignee: Fang Yong > Fix For: 1.4.0 > > > Currently "auto.offset.reset" is used to initialize the start offset of > FlinkKafkaConsumer, and the value should be earliest/latest/none. This method > can only let the job comsume the beginning or the most recent data, but can > not specify the specific offset of Kafka began to consume. > So, there should be a configuration item (such as > "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that > allows user to configure the initial offset of Kafka. The action of > "flink.source.start.time" is as follows: > 1) job start from checkpoint / savepoint > a> offset of partition can be restored from checkpoint/savepoint, > "flink.source.start.time" will be ignored. > b> there's no checkpoint/savepoint for the partition (For example, this > partition is newly increased), the "flink.kafka.start.time" will be used to > initialize the offset of the partition > 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used > to initialize the offset of the kafka > a> the "flink.source.start.time" is valid, use it to set the offset of kafka > b> the "flink.source.start.time" is out-of-range, the same as it does > currently with no initial offset, get kafka's current offset and start reading -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset
[ https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16069508#comment-16069508 ] ASF GitHub Bot commented on FLINK-6352: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3915#discussion_r124964448 --- Diff: flink-connectors/flink-connector-kafka-0.10/pom.xml --- @@ -37,7 +37,7 @@ under the License. - 0.10.0.1 + 0.10.1.0 --- End diff -- cool, thanks! > FlinkKafkaConsumer should support to use timestamp to set up start offset > - > > Key: FLINK-6352 > URL: https://issues.apache.org/jira/browse/FLINK-6352 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Fang Yong >Assignee: Fang Yong > Fix For: 1.4.0 > > > Currently "auto.offset.reset" is used to initialize the start offset of > FlinkKafkaConsumer, and the value should be earliest/latest/none. This method > can only let the job comsume the beginning or the most recent data, but can > not specify the specific offset of Kafka began to consume. > So, there should be a configuration item (such as > "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that > allows user to configure the initial offset of Kafka. The action of > "flink.source.start.time" is as follows: > 1) job start from checkpoint / savepoint > a> offset of partition can be restored from checkpoint/savepoint, > "flink.source.start.time" will be ignored. > b> there's no checkpoint/savepoint for the partition (For example, this > partition is newly increased), the "flink.kafka.start.time" will be used to > initialize the offset of the partition > 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used > to initialize the offset of the kafka > a> the "flink.source.start.time" is valid, use it to set the offset of kafka > b> the "flink.source.start.time" is out-of-range, the same as it does > currently with no initial offset, get kafka's current offset and start reading -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset
[ https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16069483#comment-16069483 ] ASF GitHub Bot commented on FLINK-6352: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3915 @zjureel there seems to be a failure in the Kafka tests caused by this PR, could you have a look? >Tests run: 1, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 2.875 sec <<< FAILURE! - in org.apache.flink.streaming.connectors.kafka.Kafka010ProducerITCase org.apache.flink.streaming.connectors.kafka.Kafka010ProducerITCase Time elapsed: 2.874 sec <<< FAILURE! java.lang.AssertionError: Test setup failed: null at org.junit.Assert.fail(Assert.java:88) at org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl.prepare(KafkaTestEnvironmentImpl.java:226) at org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment.prepare(KafkaTestEnvironment.java:45) at org.apache.flink.streaming.connectors.kafka.KafkaTestBase.startClusters(KafkaTestBase.java:138) at org.apache.flink.streaming.connectors.kafka.KafkaTestBase.prepare(KafkaTestBase.java:98) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:24) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) at org.junit.rules.RunRules.evaluate(RunRules.java:20) at org.junit.runners.ParentRunner.run(ParentRunner.java:363) at org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:283) at org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:173) at org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:153) at org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:128) at org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:203) at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:155) at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:103) Results : Failed tests: Kafka010ITCase>KafkaTestBase.prepare:98->KafkaTestBase.startClusters:138 Test setup failed: null Kafka010ProducerITCase>KafkaTestBase.prepare:98->KafkaTestBase.startClusters:138 Test setup failed: null > FlinkKafkaConsumer should support to use timestamp to set up start offset > - > > Key: FLINK-6352 > URL: https://issues.apache.org/jira/browse/FLINK-6352 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Fang Yong >Assignee: Fang Yong > Fix For: 1.4.0 > > > Currently "auto.offset.reset" is used to initialize the start offset of > FlinkKafkaConsumer, and the value should be earliest/latest/none. This method > can only let the job comsume the beginning or the most recent data, but can > not specify the specific offset of Kafka began to consume. > So, there should be a configuration item (such as > "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that > allows user to configure the initial offset of Kafka. The action of > "flink.source.start.time" is as follows: > 1) job start from checkpoint / savepoint > a> offset of partition can be restored from checkpoint/savepoint, > "flink.source.start.time" will be ignored. > b> there's no checkpoint/savepoint for the partition (For example, this > partition is newly increased), the "flink.kafka.start.time" will be used to > initialize the offset of the partition > 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used > to initialize the offset of the kafka > a> the "flink.source.start.time" is valid, use it to set the offset of kafka > b> the "flink.source.start.time" is out-of-range, the same as it does > currently with no initial offset, get kafka's current offset and start reading -- This message was sent by Atlassian JIRA
[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset
[ https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16060579#comment-16060579 ] ASF GitHub Bot commented on FLINK-6352: --- Github user zjureel commented on the issue: https://github.com/apache/flink/pull/3915 @tzulitai I find there are many conficts between this PR and master, and I have fixed them. Please have a look when you are free, thanks > FlinkKafkaConsumer should support to use timestamp to set up start offset > - > > Key: FLINK-6352 > URL: https://issues.apache.org/jira/browse/FLINK-6352 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Fang Yong >Assignee: Fang Yong > Fix For: 1.4.0 > > > Currently "auto.offset.reset" is used to initialize the start offset of > FlinkKafkaConsumer, and the value should be earliest/latest/none. This method > can only let the job comsume the beginning or the most recent data, but can > not specify the specific offset of Kafka began to consume. > So, there should be a configuration item (such as > "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that > allows user to configure the initial offset of Kafka. The action of > "flink.source.start.time" is as follows: > 1) job start from checkpoint / savepoint > a> offset of partition can be restored from checkpoint/savepoint, > "flink.source.start.time" will be ignored. > b> there's no checkpoint/savepoint for the partition (For example, this > partition is newly increased), the "flink.kafka.start.time" will be used to > initialize the offset of the partition > 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used > to initialize the offset of the kafka > a> the "flink.source.start.time" is valid, use it to set the offset of kafka > b> the "flink.source.start.time" is out-of-range, the same as it does > currently with no initial offset, get kafka's current offset and start reading -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset
[ https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16060561#comment-16060561 ] ASF GitHub Bot commented on FLINK-6352: --- GitHub user zjureel reopened a pull request: https://github.com/apache/flink/pull/3915 [FLINK-6352] Support to use timestamp to set the initial offset of kafka Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [ ] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [ ] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/zjureel/flink FLINK-6352 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3915.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3915 commit 53eaea8e73ee704e0d344fee85a67286191c6bde Author: zjureelDate: 2017-06-23T08:16:49Z [FLINK-6499] FlinkKafkaConsumer should support to use timestamp to set up start offset > FlinkKafkaConsumer should support to use timestamp to set up start offset > - > > Key: FLINK-6352 > URL: https://issues.apache.org/jira/browse/FLINK-6352 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Fang Yong >Assignee: Fang Yong > Fix For: 1.4.0 > > > Currently "auto.offset.reset" is used to initialize the start offset of > FlinkKafkaConsumer, and the value should be earliest/latest/none. This method > can only let the job comsume the beginning or the most recent data, but can > not specify the specific offset of Kafka began to consume. > So, there should be a configuration item (such as > "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that > allows user to configure the initial offset of Kafka. The action of > "flink.source.start.time" is as follows: > 1) job start from checkpoint / savepoint > a> offset of partition can be restored from checkpoint/savepoint, > "flink.source.start.time" will be ignored. > b> there's no checkpoint/savepoint for the partition (For example, this > partition is newly increased), the "flink.kafka.start.time" will be used to > initialize the offset of the partition > 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used > to initialize the offset of the kafka > a> the "flink.source.start.time" is valid, use it to set the offset of kafka > b> the "flink.source.start.time" is out-of-range, the same as it does > currently with no initial offset, get kafka's current offset and start reading -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset
[ https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16060552#comment-16060552 ] ASF GitHub Bot commented on FLINK-6352: --- Github user zjureel closed the pull request at: https://github.com/apache/flink/pull/3915 > FlinkKafkaConsumer should support to use timestamp to set up start offset > - > > Key: FLINK-6352 > URL: https://issues.apache.org/jira/browse/FLINK-6352 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Fang Yong >Assignee: Fang Yong > Fix For: 1.4.0 > > > Currently "auto.offset.reset" is used to initialize the start offset of > FlinkKafkaConsumer, and the value should be earliest/latest/none. This method > can only let the job comsume the beginning or the most recent data, but can > not specify the specific offset of Kafka began to consume. > So, there should be a configuration item (such as > "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that > allows user to configure the initial offset of Kafka. The action of > "flink.source.start.time" is as follows: > 1) job start from checkpoint / savepoint > a> offset of partition can be restored from checkpoint/savepoint, > "flink.source.start.time" will be ignored. > b> there's no checkpoint/savepoint for the partition (For example, this > partition is newly increased), the "flink.kafka.start.time" will be used to > initialize the offset of the partition > 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used > to initialize the offset of the kafka > a> the "flink.source.start.time" is valid, use it to set the offset of kafka > b> the "flink.source.start.time" is out-of-range, the same as it does > currently with no initial offset, get kafka's current offset and start reading -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset
[ https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16015518#comment-16015518 ] ASF GitHub Bot commented on FLINK-6352: --- Github user zjureel commented on a diff in the pull request: https://github.com/apache/flink/pull/3915#discussion_r117206216 --- Diff: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java --- @@ -187,31 +191,65 @@ public FlinkKafkaConsumer08(List topics, KeyedDeserializationSchema d validateAutoOffsetResetValue(props); } + /** +* Search offset from timestamp for each topic in kafka. If no offset exist, use the latest offset. +* +* @param partitionTimesMap Kafka topic partition and timestamp +* @return Kafka topic partition and the earliest offset after the timestamp. If no offset exist, use the latest offset in kafka +*/ + private MapconvertTimestampToOffset(Map partitionTimesMap) { --- End diff -- Indeed, user may be doubt about the new method when he used Kafka version 0.8 and 0.9 both. New functionality backwards compatibility is a better experience, I think this method could be added when timestamp is supported both by version 0.8 and 0.9 > FlinkKafkaConsumer should support to use timestamp to set up start offset > - > > Key: FLINK-6352 > URL: https://issues.apache.org/jira/browse/FLINK-6352 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Fang Yong >Assignee: Fang Yong > Fix For: 1.3.0 > > > Currently "auto.offset.reset" is used to initialize the start offset of > FlinkKafkaConsumer, and the value should be earliest/latest/none. This method > can only let the job comsume the beginning or the most recent data, but can > not specify the specific offset of Kafka began to consume. > So, there should be a configuration item (such as > "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that > allows user to configure the initial offset of Kafka. The action of > "flink.source.start.time" is as follows: > 1) job start from checkpoint / savepoint > a> offset of partition can be restored from checkpoint/savepoint, > "flink.source.start.time" will be ignored. > b> there's no checkpoint/savepoint for the partition (For example, this > partition is newly increased), the "flink.kafka.start.time" will be used to > initialize the offset of the partition > 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used > to initialize the offset of the kafka > a> the "flink.source.start.time" is valid, use it to set the offset of kafka > b> the "flink.source.start.time" is out-of-range, the same as it does > currently with no initial offset, get kafka's current offset and start reading -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset
[ https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16015324#comment-16015324 ] ASF GitHub Bot commented on FLINK-6352: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3915#discussion_r117175760 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java --- @@ -311,12 +311,14 @@ public FlinkKafkaConsumerBase(List topics, KeyedDeserializationSchema * from a checkpoint or savepoint. When the consumer is restored from a checkpoint or * savepoint, only the offsets in the restored state will be used. * -* Note: The api is supported by kafka version >= 0.10 only. -* * @return The consumer object, to allow function chaining. */ public FlinkKafkaConsumerBase setStartFromSpecificDate(Date date) { - throw new RuntimeException("This method supports kafka version >= 0.10 only."); + Preconditions.checkArgument(null != date && date.getTime() <= System.currentTimeMillis(), "Startup time must before curr time."); --- End diff -- must "be" before. Could you also add the errorneous time to the error message? > FlinkKafkaConsumer should support to use timestamp to set up start offset > - > > Key: FLINK-6352 > URL: https://issues.apache.org/jira/browse/FLINK-6352 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Fang Yong >Assignee: Fang Yong > Fix For: 1.3.0 > > > Currently "auto.offset.reset" is used to initialize the start offset of > FlinkKafkaConsumer, and the value should be earliest/latest/none. This method > can only let the job comsume the beginning or the most recent data, but can > not specify the specific offset of Kafka began to consume. > So, there should be a configuration item (such as > "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that > allows user to configure the initial offset of Kafka. The action of > "flink.source.start.time" is as follows: > 1) job start from checkpoint / savepoint > a> offset of partition can be restored from checkpoint/savepoint, > "flink.source.start.time" will be ignored. > b> there's no checkpoint/savepoint for the partition (For example, this > partition is newly increased), the "flink.kafka.start.time" will be used to > initialize the offset of the partition > 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used > to initialize the offset of the kafka > a> the "flink.source.start.time" is valid, use it to set the offset of kafka > b> the "flink.source.start.time" is out-of-range, the same as it does > currently with no initial offset, get kafka's current offset and start reading -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset
[ https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16015322#comment-16015322 ] ASF GitHub Bot commented on FLINK-6352: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3915#discussion_r117175607 --- Diff: flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java --- @@ -171,6 +170,11 @@ public FlinkKafkaConsumer09(List topics, KeyedDeserializationSchema d } @Override + public FlinkKafkaConsumerBase setStartFromSpecificDate(Date date) { + throw new RuntimeException("This method dose not support for version 0.8 of Kafka"); --- End diff -- Do you mean 0.9? Also, typo on "dose". I would also suggest to be more specific: "Starting from a specific date is not supported for Kafka version xx". > FlinkKafkaConsumer should support to use timestamp to set up start offset > - > > Key: FLINK-6352 > URL: https://issues.apache.org/jira/browse/FLINK-6352 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Fang Yong >Assignee: Fang Yong > Fix For: 1.3.0 > > > Currently "auto.offset.reset" is used to initialize the start offset of > FlinkKafkaConsumer, and the value should be earliest/latest/none. This method > can only let the job comsume the beginning or the most recent data, but can > not specify the specific offset of Kafka began to consume. > So, there should be a configuration item (such as > "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that > allows user to configure the initial offset of Kafka. The action of > "flink.source.start.time" is as follows: > 1) job start from checkpoint / savepoint > a> offset of partition can be restored from checkpoint/savepoint, > "flink.source.start.time" will be ignored. > b> there's no checkpoint/savepoint for the partition (For example, this > partition is newly increased), the "flink.kafka.start.time" will be used to > initialize the offset of the partition > 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used > to initialize the offset of the kafka > a> the "flink.source.start.time" is valid, use it to set the offset of kafka > b> the "flink.source.start.time" is out-of-range, the same as it does > currently with no initial offset, get kafka's current offset and start reading -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset
[ https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16015323#comment-16015323 ] ASF GitHub Bot commented on FLINK-6352: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3915#discussion_r117175199 --- Diff: flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java --- @@ -187,31 +191,65 @@ public FlinkKafkaConsumer08(List topics, KeyedDeserializationSchema d validateAutoOffsetResetValue(props); } + /** +* Search offset from timestamp for each topic in kafka. If no offset exist, use the latest offset. +* +* @param partitionTimesMap Kafka topic partition and timestamp +* @return Kafka topic partition and the earliest offset after the timestamp. If no offset exist, use the latest offset in kafka +*/ + private MapconvertTimestampToOffset(Map partitionTimesMap) { --- End diff -- Actually I think lets just disable the timestamp option for 0.8. I just think its a bit strange that the functionality is there for 0.8 and 0.10, but skipped for 0.10. Sorry for jumping back and forth here, trying to figure out what would be most natural. > FlinkKafkaConsumer should support to use timestamp to set up start offset > - > > Key: FLINK-6352 > URL: https://issues.apache.org/jira/browse/FLINK-6352 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Fang Yong >Assignee: Fang Yong > Fix For: 1.3.0 > > > Currently "auto.offset.reset" is used to initialize the start offset of > FlinkKafkaConsumer, and the value should be earliest/latest/none. This method > can only let the job comsume the beginning or the most recent data, but can > not specify the specific offset of Kafka began to consume. > So, there should be a configuration item (such as > "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that > allows user to configure the initial offset of Kafka. The action of > "flink.source.start.time" is as follows: > 1) job start from checkpoint / savepoint > a> offset of partition can be restored from checkpoint/savepoint, > "flink.source.start.time" will be ignored. > b> there's no checkpoint/savepoint for the partition (For example, this > partition is newly increased), the "flink.kafka.start.time" will be used to > initialize the offset of the partition > 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used > to initialize the offset of the kafka > a> the "flink.source.start.time" is valid, use it to set the offset of kafka > b> the "flink.source.start.time" is out-of-range, the same as it does > currently with no initial offset, get kafka's current offset and start reading -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset
[ https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16015307#comment-16015307 ] ASF GitHub Bot commented on FLINK-6352: --- Github user zjureel commented on the issue: https://github.com/apache/flink/pull/3915 @tzulitai I have fixed `setStartFromSpecificDate` problem, and updated `FlinkKafkaConsumer08` so that it supports to set start offsets of Kafka by date > FlinkKafkaConsumer should support to use timestamp to set up start offset > - > > Key: FLINK-6352 > URL: https://issues.apache.org/jira/browse/FLINK-6352 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Fang Yong >Assignee: Fang Yong > Fix For: 1.3.0 > > > Currently "auto.offset.reset" is used to initialize the start offset of > FlinkKafkaConsumer, and the value should be earliest/latest/none. This method > can only let the job comsume the beginning or the most recent data, but can > not specify the specific offset of Kafka began to consume. > So, there should be a configuration item (such as > "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that > allows user to configure the initial offset of Kafka. The action of > "flink.source.start.time" is as follows: > 1) job start from checkpoint / savepoint > a> offset of partition can be restored from checkpoint/savepoint, > "flink.source.start.time" will be ignored. > b> there's no checkpoint/savepoint for the partition (For example, this > partition is newly increased), the "flink.kafka.start.time" will be used to > initialize the offset of the partition > 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used > to initialize the offset of the kafka > a> the "flink.source.start.time" is valid, use it to set the offset of kafka > b> the "flink.source.start.time" is out-of-range, the same as it does > currently with no initial offset, get kafka's current offset and start reading -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset
[ https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16015242#comment-16015242 ] ASF GitHub Bot commented on FLINK-6352: --- Github user zjureel commented on the issue: https://github.com/apache/flink/pull/3915 Thank you for your suggestion. It sounds good and will be more friendly to users than throwing exception in `FlinkKafkaConsumerBase`. I'll fix it soon, thanks :) > FlinkKafkaConsumer should support to use timestamp to set up start offset > - > > Key: FLINK-6352 > URL: https://issues.apache.org/jira/browse/FLINK-6352 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Fang Yong >Assignee: Fang Yong > Fix For: 1.3.0 > > > Currently "auto.offset.reset" is used to initialize the start offset of > FlinkKafkaConsumer, and the value should be earliest/latest/none. This method > can only let the job comsume the beginning or the most recent data, but can > not specify the specific offset of Kafka began to consume. > So, there should be a configuration item (such as > "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that > allows user to configure the initial offset of Kafka. The action of > "flink.source.start.time" is as follows: > 1) job start from checkpoint / savepoint > a> offset of partition can be restored from checkpoint/savepoint, > "flink.source.start.time" will be ignored. > b> there's no checkpoint/savepoint for the partition (For example, this > partition is newly increased), the "flink.kafka.start.time" will be used to > initialize the offset of the partition > 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used > to initialize the offset of the kafka > a> the "flink.source.start.time" is valid, use it to set the offset of kafka > b> the "flink.source.start.time" is out-of-range, the same as it does > currently with no initial offset, get kafka's current offset and start reading -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset
[ https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16015237#comment-16015237 ] ASF GitHub Bot commented on FLINK-6352: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3915 @zjureel For Kafka 0.11, I would expect it to just extend `FlinkKafkaConsumer010`. As you can see, that is also the case right now for 010; its extending `FlinkKafkaConsumer09` and not the base class. > FlinkKafkaConsumer should support to use timestamp to set up start offset > - > > Key: FLINK-6352 > URL: https://issues.apache.org/jira/browse/FLINK-6352 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Fang Yong >Assignee: Fang Yong > Fix For: 1.3.0 > > > Currently "auto.offset.reset" is used to initialize the start offset of > FlinkKafkaConsumer, and the value should be earliest/latest/none. This method > can only let the job comsume the beginning or the most recent data, but can > not specify the specific offset of Kafka began to consume. > So, there should be a configuration item (such as > "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that > allows user to configure the initial offset of Kafka. The action of > "flink.source.start.time" is as follows: > 1) job start from checkpoint / savepoint > a> offset of partition can be restored from checkpoint/savepoint, > "flink.source.start.time" will be ignored. > b> there's no checkpoint/savepoint for the partition (For example, this > partition is newly increased), the "flink.kafka.start.time" will be used to > initialize the offset of the partition > 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used > to initialize the offset of the kafka > a> the "flink.source.start.time" is valid, use it to set the offset of kafka > b> the "flink.source.start.time" is out-of-range, the same as it does > currently with no initial offset, get kafka's current offset and start reading -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset
[ https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16015217#comment-16015217 ] ASF GitHub Bot commented on FLINK-6352: --- Github user zjureel commented on a diff in the pull request: https://github.com/apache/flink/pull/3915#discussion_r117162289 --- Diff: flink-connectors/flink-connector-kafka-0.10/pom.xml --- @@ -37,7 +37,7 @@ under the License. - 0.10.0.1 + 0.10.1.0 --- End diff -- The dependency tree of 0.10.0.1 and 0.10.1.0 is the same when I use mvn dependency:tree to print the dependency information: +- org.apache.kafka:kafka-clients:jar:0.10.0.1:compile | +- net.jpountz.lz4:lz4:jar:1.3.0:compile | \- org.xerial.snappy:snappy-java:jar:1.1.2.6:compile +- org.apache.kafka:kafka-clients:jar:0.10.1.0:compile | +- net.jpountz.lz4:lz4:jar:1.3.0:compile | \- org.xerial.snappy:snappy-java:jar:1.1.2.6:compile > FlinkKafkaConsumer should support to use timestamp to set up start offset > - > > Key: FLINK-6352 > URL: https://issues.apache.org/jira/browse/FLINK-6352 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Fang Yong >Assignee: Fang Yong > Fix For: 1.3.0 > > > Currently "auto.offset.reset" is used to initialize the start offset of > FlinkKafkaConsumer, and the value should be earliest/latest/none. This method > can only let the job comsume the beginning or the most recent data, but can > not specify the specific offset of Kafka began to consume. > So, there should be a configuration item (such as > "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that > allows user to configure the initial offset of Kafka. The action of > "flink.source.start.time" is as follows: > 1) job start from checkpoint / savepoint > a> offset of partition can be restored from checkpoint/savepoint, > "flink.source.start.time" will be ignored. > b> there's no checkpoint/savepoint for the partition (For example, this > partition is newly increased), the "flink.kafka.start.time" will be used to > initialize the offset of the partition > 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used > to initialize the offset of the kafka > a> the "flink.source.start.time" is valid, use it to set the offset of kafka > b> the "flink.source.start.time" is out-of-range, the same as it does > currently with no initial offset, get kafka's current offset and start reading -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset
[ https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16015216#comment-16015216 ] ASF GitHub Bot commented on FLINK-6352: --- Github user zjureel commented on the issue: https://github.com/apache/flink/pull/3915 @tzulitai Glad to hear from you. In fact I'm also entangled with whether to put the `setStartFromSpecificDate` method into `FlinkKafkaConsumerBase`, and I put it into `FlinkKafkaComsumerBase` finally for two reasons: 1. All the other methods that set the Kafka start offset are in `FlinkKafkaConsumerBase`, to keep it aligned, I put `setStartFromSpecificDate` in `FlinkKafkaComsumerBase` 2. For subsequent versions of Kafka, such as version 0.11, this feature should be available also, but it may need to extend from the `FlinkKafkaConsumerBase` directly. I think this method will be used in multiple implements, so I put `setStartFromSpecificDate` in `FlinkKafkaComsumerBase` > FlinkKafkaConsumer should support to use timestamp to set up start offset > - > > Key: FLINK-6352 > URL: https://issues.apache.org/jira/browse/FLINK-6352 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Fang Yong >Assignee: Fang Yong > Fix For: 1.3.0 > > > Currently "auto.offset.reset" is used to initialize the start offset of > FlinkKafkaConsumer, and the value should be earliest/latest/none. This method > can only let the job comsume the beginning or the most recent data, but can > not specify the specific offset of Kafka began to consume. > So, there should be a configuration item (such as > "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that > allows user to configure the initial offset of Kafka. The action of > "flink.source.start.time" is as follows: > 1) job start from checkpoint / savepoint > a> offset of partition can be restored from checkpoint/savepoint, > "flink.source.start.time" will be ignored. > b> there's no checkpoint/savepoint for the partition (For example, this > partition is newly increased), the "flink.kafka.start.time" will be used to > initialize the offset of the partition > 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used > to initialize the offset of the kafka > a> the "flink.source.start.time" is valid, use it to set the offset of kafka > b> the "flink.source.start.time" is out-of-range, the same as it does > currently with no initial offset, get kafka's current offset and start reading -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset
[ https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16011953#comment-16011953 ] ASF GitHub Bot commented on FLINK-6352: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3915 Also, it seems like 0.8 does support a timestamp-based offset retrieval. See the "finding start offsets for reads" in https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example. > FlinkKafkaConsumer should support to use timestamp to set up start offset > - > > Key: FLINK-6352 > URL: https://issues.apache.org/jira/browse/FLINK-6352 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Fang Yong >Assignee: Fang Yong > Fix For: 1.3.0 > > > Currently "auto.offset.reset" is used to initialize the start offset of > FlinkKafkaConsumer, and the value should be earliest/latest/none. This method > can only let the job comsume the beginning or the most recent data, but can > not specify the specific offset of Kafka began to consume. > So, there should be a configuration item (such as > "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that > allows user to configure the initial offset of Kafka. The action of > "flink.source.start.time" is as follows: > 1) job start from checkpoint / savepoint > a> offset of partition can be restored from checkpoint/savepoint, > "flink.source.start.time" will be ignored. > b> there's no checkpoint/savepoint for the partition (For example, this > partition is newly increased), the "flink.kafka.start.time" will be used to > initialize the offset of the partition > 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used > to initialize the offset of the kafka > a> the "flink.source.start.time" is valid, use it to set the offset of kafka > b> the "flink.source.start.time" is out-of-range, the same as it does > currently with no initial offset, get kafka's current offset and start reading -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset
[ https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16011946#comment-16011946 ] ASF GitHub Bot commented on FLINK-6352: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3915#discussion_r116674541 --- Diff: flink-connectors/flink-connector-kafka-0.10/pom.xml --- @@ -37,7 +37,7 @@ under the License. - 0.10.0.1 + 0.10.1.0 --- End diff -- Just to be sure: were there any additional dependencies as a result to this bump? > FlinkKafkaConsumer should support to use timestamp to set up start offset > - > > Key: FLINK-6352 > URL: https://issues.apache.org/jira/browse/FLINK-6352 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Fang Yong >Assignee: Fang Yong > Fix For: 1.3.0 > > > Currently "auto.offset.reset" is used to initialize the start offset of > FlinkKafkaConsumer, and the value should be earliest/latest/none. This method > can only let the job comsume the beginning or the most recent data, but can > not specify the specific offset of Kafka began to consume. > So, there should be a configuration item (such as > "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that > allows user to configure the initial offset of Kafka. The action of > "flink.source.start.time" is as follows: > 1) job start from checkpoint / savepoint > a> offset of partition can be restored from checkpoint/savepoint, > "flink.source.start.time" will be ignored. > b> there's no checkpoint/savepoint for the partition (For example, this > partition is newly increased), the "flink.kafka.start.time" will be used to > initialize the offset of the partition > 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used > to initialize the offset of the kafka > a> the "flink.source.start.time" is valid, use it to set the offset of kafka > b> the "flink.source.start.time" is out-of-range, the same as it does > currently with no initial offset, get kafka's current offset and start reading -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset
[ https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16011947#comment-16011947 ] ASF GitHub Bot commented on FLINK-6352: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3915#discussion_r116674594 --- Diff: flink-connectors/flink-connector-kafka-0.10/pom.xml --- @@ -53,6 +53,10 @@ under the License. org.apache.kafka kafka_${scala.binary.version} + + org.apache.kafka + kafka-clients + --- End diff -- Could you explain a bit why this is needed now? Thanks :) > FlinkKafkaConsumer should support to use timestamp to set up start offset > - > > Key: FLINK-6352 > URL: https://issues.apache.org/jira/browse/FLINK-6352 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Fang Yong >Assignee: Fang Yong > Fix For: 1.3.0 > > > Currently "auto.offset.reset" is used to initialize the start offset of > FlinkKafkaConsumer, and the value should be earliest/latest/none. This method > can only let the job comsume the beginning or the most recent data, but can > not specify the specific offset of Kafka began to consume. > So, there should be a configuration item (such as > "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that > allows user to configure the initial offset of Kafka. The action of > "flink.source.start.time" is as follows: > 1) job start from checkpoint / savepoint > a> offset of partition can be restored from checkpoint/savepoint, > "flink.source.start.time" will be ignored. > b> there's no checkpoint/savepoint for the partition (For example, this > partition is newly increased), the "flink.kafka.start.time" will be used to > initialize the offset of the partition > 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used > to initialize the offset of the kafka > a> the "flink.source.start.time" is valid, use it to set the offset of kafka > b> the "flink.source.start.time" is out-of-range, the same as it does > currently with no initial offset, get kafka's current offset and start reading -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset
[ https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16011942#comment-16011942 ] ASF GitHub Bot commented on FLINK-6352: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3915#discussion_r116675159 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java --- @@ -290,10 +296,30 @@ public FlinkKafkaConsumerBase(List topics, KeyedDeserializationSchema public FlinkKafkaConsumerBase setStartFromGroupOffsets() { this.startupMode = StartupMode.GROUP_OFFSETS; this.specificStartupOffsets = null; + this.specificStartupDate = null; return this; } /** +* Specifies the consumer to start reading partitions from specific date. The specified date must before curr timestamp. +* This lets the consumer ignore any committed group offsets in Zookeeper / Kafka brokers. +* +* The consumer will look up the earliest offset whose timestamp is greater than or equal to the specific date from the kafka. --- End diff -- the kafka --> just "Kafka", with K capitalized. > FlinkKafkaConsumer should support to use timestamp to set up start offset > - > > Key: FLINK-6352 > URL: https://issues.apache.org/jira/browse/FLINK-6352 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Fang Yong >Assignee: Fang Yong > Fix For: 1.3.0 > > > Currently "auto.offset.reset" is used to initialize the start offset of > FlinkKafkaConsumer, and the value should be earliest/latest/none. This method > can only let the job comsume the beginning or the most recent data, but can > not specify the specific offset of Kafka began to consume. > So, there should be a configuration item (such as > "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that > allows user to configure the initial offset of Kafka. The action of > "flink.source.start.time" is as follows: > 1) job start from checkpoint / savepoint > a> offset of partition can be restored from checkpoint/savepoint, > "flink.source.start.time" will be ignored. > b> there's no checkpoint/savepoint for the partition (For example, this > partition is newly increased), the "flink.kafka.start.time" will be used to > initialize the offset of the partition > 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used > to initialize the offset of the kafka > a> the "flink.source.start.time" is valid, use it to set the offset of kafka > b> the "flink.source.start.time" is out-of-range, the same as it does > currently with no initial offset, get kafka's current offset and start reading -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset
[ https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16011945#comment-16011945 ] ASF GitHub Bot commented on FLINK-6352: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3915#discussion_r116676798 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java --- @@ -290,10 +296,30 @@ public FlinkKafkaConsumerBase(List topics, KeyedDeserializationSchema public FlinkKafkaConsumerBase setStartFromGroupOffsets() { this.startupMode = StartupMode.GROUP_OFFSETS; this.specificStartupOffsets = null; + this.specificStartupDate = null; return this; } /** +* Specifies the consumer to start reading partitions from specific date. The specified date must before curr timestamp. +* This lets the consumer ignore any committed group offsets in Zookeeper / Kafka brokers. +* +* The consumer will look up the earliest offset whose timestamp is greater than or equal to the specific date from the kafka. +* If there's no such message, the consumer will use the latest offset to read data from kafka. +* +* This method does not effect where partitions are read from when the consumer is restored +* from a checkpoint or savepoint. When the consumer is restored from a checkpoint or +* savepoint, only the offsets in the restored state will be used. +* +* Note: The api is supported by kafka version >= 0.10 only. +* +* @return The consumer object, to allow function chaining. +*/ + public FlinkKafkaConsumerBase setStartFromSpecificDate(Date date) { + throw new RuntimeException("This method supports kafka version >= 0.10 only."); --- End diff -- If only 0.10 supports this, shouldn't we add it to the `FlinkKafkaConsumer010` class only? > FlinkKafkaConsumer should support to use timestamp to set up start offset > - > > Key: FLINK-6352 > URL: https://issues.apache.org/jira/browse/FLINK-6352 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Fang Yong >Assignee: Fang Yong > Fix For: 1.3.0 > > > Currently "auto.offset.reset" is used to initialize the start offset of > FlinkKafkaConsumer, and the value should be earliest/latest/none. This method > can only let the job comsume the beginning or the most recent data, but can > not specify the specific offset of Kafka began to consume. > So, there should be a configuration item (such as > "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that > allows user to configure the initial offset of Kafka. The action of > "flink.source.start.time" is as follows: > 1) job start from checkpoint / savepoint > a> offset of partition can be restored from checkpoint/savepoint, > "flink.source.start.time" will be ignored. > b> there's no checkpoint/savepoint for the partition (For example, this > partition is newly increased), the "flink.kafka.start.time" will be used to > initialize the offset of the partition > 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used > to initialize the offset of the kafka > a> the "flink.source.start.time" is valid, use it to set the offset of kafka > b> the "flink.source.start.time" is out-of-range, the same as it does > currently with no initial offset, get kafka's current offset and start reading -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset
[ https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16011941#comment-16011941 ] ASF GitHub Bot commented on FLINK-6352: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3915#discussion_r116675211 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java --- @@ -290,10 +296,30 @@ public FlinkKafkaConsumerBase(List topics, KeyedDeserializationSchema public FlinkKafkaConsumerBase setStartFromGroupOffsets() { this.startupMode = StartupMode.GROUP_OFFSETS; this.specificStartupOffsets = null; + this.specificStartupDate = null; return this; } /** +* Specifies the consumer to start reading partitions from specific date. The specified date must before curr timestamp. +* This lets the consumer ignore any committed group offsets in Zookeeper / Kafka brokers. +* +* The consumer will look up the earliest offset whose timestamp is greater than or equal to the specific date from the kafka. +* If there's no such message, the consumer will use the latest offset to read data from kafka. --- End diff -- "message" --> "offset" is the term used in Kafka > FlinkKafkaConsumer should support to use timestamp to set up start offset > - > > Key: FLINK-6352 > URL: https://issues.apache.org/jira/browse/FLINK-6352 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Fang Yong >Assignee: Fang Yong > Fix For: 1.3.0 > > > Currently "auto.offset.reset" is used to initialize the start offset of > FlinkKafkaConsumer, and the value should be earliest/latest/none. This method > can only let the job comsume the beginning or the most recent data, but can > not specify the specific offset of Kafka began to consume. > So, there should be a configuration item (such as > "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that > allows user to configure the initial offset of Kafka. The action of > "flink.source.start.time" is as follows: > 1) job start from checkpoint / savepoint > a> offset of partition can be restored from checkpoint/savepoint, > "flink.source.start.time" will be ignored. > b> there's no checkpoint/savepoint for the partition (For example, this > partition is newly increased), the "flink.kafka.start.time" will be used to > initialize the offset of the partition > 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used > to initialize the offset of the kafka > a> the "flink.source.start.time" is valid, use it to set the offset of kafka > b> the "flink.source.start.time" is out-of-range, the same as it does > currently with no initial offset, get kafka's current offset and start reading -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset
[ https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16011943#comment-16011943 ] ASF GitHub Bot commented on FLINK-6352: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3915#discussion_r116675083 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java --- @@ -290,10 +296,30 @@ public FlinkKafkaConsumerBase(List topics, KeyedDeserializationSchema public FlinkKafkaConsumerBase setStartFromGroupOffsets() { this.startupMode = StartupMode.GROUP_OFFSETS; this.specificStartupOffsets = null; + this.specificStartupDate = null; return this; } /** +* Specifies the consumer to start reading partitions from specific date. The specified date must before curr timestamp. --- End diff -- "curr" --> "current" We usually avoid abbreviations like this in Javadoc. > FlinkKafkaConsumer should support to use timestamp to set up start offset > - > > Key: FLINK-6352 > URL: https://issues.apache.org/jira/browse/FLINK-6352 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Fang Yong >Assignee: Fang Yong > Fix For: 1.3.0 > > > Currently "auto.offset.reset" is used to initialize the start offset of > FlinkKafkaConsumer, and the value should be earliest/latest/none. This method > can only let the job comsume the beginning or the most recent data, but can > not specify the specific offset of Kafka began to consume. > So, there should be a configuration item (such as > "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that > allows user to configure the initial offset of Kafka. The action of > "flink.source.start.time" is as follows: > 1) job start from checkpoint / savepoint > a> offset of partition can be restored from checkpoint/savepoint, > "flink.source.start.time" will be ignored. > b> there's no checkpoint/savepoint for the partition (For example, this > partition is newly increased), the "flink.kafka.start.time" will be used to > initialize the offset of the partition > 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used > to initialize the offset of the kafka > a> the "flink.source.start.time" is valid, use it to set the offset of kafka > b> the "flink.source.start.time" is out-of-range, the same as it does > currently with no initial offset, get kafka's current offset and start reading -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset
[ https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16011944#comment-16011944 ] ASF GitHub Bot commented on FLINK-6352: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3915#discussion_r116674925 --- Diff: flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java --- @@ -181,12 +181,6 @@ public FlinkKafkaConsumer09(List topics, KeyedDeserializationSchema d boolean useMetrics = !PropertiesUtil.getBoolean(properties, KEY_DISABLE_METRICS, false); - // make sure that auto commit is disabled when our offset commit mode is ON_CHECKPOINTS; - // this overwrites whatever setting the user configured in the properties - if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS || offsetCommitMode == OffsetCommitMode.DISABLED) { - properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); - } --- End diff -- This shouldn't be removed (I assume you accidentally removed it when rebasing?). > FlinkKafkaConsumer should support to use timestamp to set up start offset > - > > Key: FLINK-6352 > URL: https://issues.apache.org/jira/browse/FLINK-6352 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Fang Yong >Assignee: Fang Yong > Fix For: 1.3.0 > > > Currently "auto.offset.reset" is used to initialize the start offset of > FlinkKafkaConsumer, and the value should be earliest/latest/none. This method > can only let the job comsume the beginning or the most recent data, but can > not specify the specific offset of Kafka began to consume. > So, there should be a configuration item (such as > "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that > allows user to configure the initial offset of Kafka. The action of > "flink.source.start.time" is as follows: > 1) job start from checkpoint / savepoint > a> offset of partition can be restored from checkpoint/savepoint, > "flink.source.start.time" will be ignored. > b> there's no checkpoint/savepoint for the partition (For example, this > partition is newly increased), the "flink.kafka.start.time" will be used to > initialize the offset of the partition > 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used > to initialize the offset of the kafka > a> the "flink.source.start.time" is valid, use it to set the offset of kafka > b> the "flink.source.start.time" is out-of-range, the same as it does > currently with no initial offset, get kafka's current offset and start reading -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset
[ https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16011919#comment-16011919 ] ASF GitHub Bot commented on FLINK-6352: --- Github user zjureel commented on the issue: https://github.com/apache/flink/pull/3915 @tzulitai Could you please review code here when you are free, thanks :) > FlinkKafkaConsumer should support to use timestamp to set up start offset > - > > Key: FLINK-6352 > URL: https://issues.apache.org/jira/browse/FLINK-6352 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Fang Yong >Assignee: Fang Yong > Fix For: 1.3.0 > > > Currently "auto.offset.reset" is used to initialize the start offset of > FlinkKafkaConsumer, and the value should be earliest/latest/none. This method > can only let the job comsume the beginning or the most recent data, but can > not specify the specific offset of Kafka began to consume. > So, there should be a configuration item (such as > "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that > allows user to configure the initial offset of Kafka. The action of > "flink.source.start.time" is as follows: > 1) job start from checkpoint / savepoint > a> offset of partition can be restored from checkpoint/savepoint, > "flink.source.start.time" will be ignored. > b> there's no checkpoint/savepoint for the partition (For example, this > partition is newly increased), the "flink.kafka.start.time" will be used to > initialize the offset of the partition > 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used > to initialize the offset of the kafka > a> the "flink.source.start.time" is valid, use it to set the offset of kafka > b> the "flink.source.start.time" is out-of-range, the same as it does > currently with no initial offset, get kafka's current offset and start reading -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset
[ https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16011918#comment-16011918 ] ASF GitHub Bot commented on FLINK-6352: --- GitHub user zjureel opened a pull request: https://github.com/apache/flink/pull/3915 [FLINK-6352] Support to use timestamp to set the initial offset of kafka Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [ ] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [ ] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/zjureel/flink FLINK-6352 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3915.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3915 commit e1f5aee8a471ef1f1e8cec3104807b22954b6a42 Author: zjureelDate: 2017-05-15T10:27:24Z [FLINK-6352] Support to use timestamp to set the initial offset of kafka commit 5d482c57ad19f0f9739fe5b40fe6e8713900e8a4 Author: zjureel Date: 2017-05-16T07:37:09Z fix StreamExecutionEnvironment test > FlinkKafkaConsumer should support to use timestamp to set up start offset > - > > Key: FLINK-6352 > URL: https://issues.apache.org/jira/browse/FLINK-6352 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Fang Yong >Assignee: Fang Yong > Fix For: 1.3.0 > > > Currently "auto.offset.reset" is used to initialize the start offset of > FlinkKafkaConsumer, and the value should be earliest/latest/none. This method > can only let the job comsume the beginning or the most recent data, but can > not specify the specific offset of Kafka began to consume. > So, there should be a configuration item (such as > "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that > allows user to configure the initial offset of Kafka. The action of > "flink.source.start.time" is as follows: > 1) job start from checkpoint / savepoint > a> offset of partition can be restored from checkpoint/savepoint, > "flink.source.start.time" will be ignored. > b> there's no checkpoint/savepoint for the partition (For example, this > partition is newly increased), the "flink.kafka.start.time" will be used to > initialize the offset of the partition > 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used > to initialize the offset of the kafka > a> the "flink.source.start.time" is valid, use it to set the offset of kafka > b> the "flink.source.start.time" is out-of-range, the same as it does > currently with no initial offset, get kafka's current offset and start reading -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset
[ https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15978177#comment-15978177 ] Fang Yong commented on FLINK-6352: -- Hi [~tzulitai], [~rmetzger], what do you think about this? Or anyone has some suggestions, pleas feel free to comment here. > FlinkKafkaConsumer should support to use timestamp to set up start offset > - > > Key: FLINK-6352 > URL: https://issues.apache.org/jira/browse/FLINK-6352 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Fang Yong > Fix For: 1.3.0 > > > Currently "auto.offset.reset" is used to initialize the start offset of > FlinkKafkaConsumer, and the value should be earliest/latest/none. This method > can only let the job comsume the beginning or the most recent data, but can > not specify the specific offset of Kafka began to consume. > So, there should be a configuration item (such as > "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that > allows user to configure the initial offset of Kafka. The action of > "flink.source.start.time" is as follows: > 1) job start from checkpoint / savepoint > a> offset of partition can be restored from checkpoint/savepoint, > "flink.source.start.time" will be ignored. > b> there's no checkpoint/savepoint for the partition (For example, this > partition is newly increased), the "flink.kafka.start.time" will be used to > initialize the offset of the partition > 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used > to initialize the offset of the kafka > a> the "flink.source.start.time" is valid, use it to set the offset of kafka > b> the "flink.source.start.time" is out-of-range, the same as it does > currently with no initial offset, get kafka's current offset and start reading -- This message was sent by Atlassian JIRA (v6.3.15#6346)