Repository: flink Updated Branches: refs/heads/master d95929e01 -> e4343ba0d
[FLINK-4727] [kafka] Set missing initial offset states with starting KafkaConsumer position With this change, on a clean startup of FlinkKafkaConsumer09, the auto retrieved offsets (either earliest, latest, or an actual committed offset) from Kafka will also be checkpointed and committed, even if no records are read after the startup. This closes #2585 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e4343ba0 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e4343ba0 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e4343ba0 Branch: refs/heads/master Commit: e4343ba0de24897c3cf62891429d5faf82043242 Parents: d95929e Author: Tzu-Li (Gordon) Tai <tzuli...@apache.org> Authored: Tue Oct 4 16:53:38 2016 +0800 Committer: Tzu-Li (Gordon) Tai <tzuli...@apache.org> Committed: Wed Oct 19 09:53:00 2016 +0800 ---------------------------------------------------------------------- .../streaming/connectors/kafka/Kafka010ITCase.java | 9 ++++----- .../connectors/kafka/internal/Kafka09Fetcher.java | 15 +++++++++++++++ .../streaming/connectors/kafka/Kafka09ITCase.java | 9 ++++----- 3 files changed, 23 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/e4343ba0/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java index 77407ff..08511c9 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java @@ -143,11 +143,10 @@ public class Kafka010ITCase extends KafkaConsumerTestBase { runStartFromKafkaCommitOffsets(); } - // TODO: This test will not pass until FLINK-4727 is resolved -// @Test(timeout = 60000) -// public void testAutoOffsetRetrievalAndCommitToKafka() throws Exception { -// runAutoOffsetRetrievalAndCommitToKafka(); -// } + @Test(timeout = 60000) + public void testAutoOffsetRetrievalAndCommitToKafka() throws Exception { + runAutoOffsetRetrievalAndCommitToKafka(); + } /** * Kafka 0.10 specific test, ensuring Timestamps are properly written to and read from Kafka http://git-wip-us.apache.org/repos/asf/flink/blob/e4343ba0/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java index 3a3d3de..af3b199 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java @@ -205,7 +205,22 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> implem // seek the consumer to the initial offsets for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitions()) { if (partition.isOffsetDefined()) { + LOG.info("Partition {} has restored initial offsets {} from checkpoint / savepoint; seeking the consumer " + + "to position {}", partition.getKafkaPartitionHandle(), partition.getOffset(), partition.getOffset() + 1); + consumer.seek(partition.getKafkaPartitionHandle(), partition.getOffset() + 1); + } else { + // for partitions that do not have offsets restored from a checkpoint/savepoint, + // we need to define our internal offset state for them using the initial offsets retrieved from Kafka + // by the KafkaConsumer, so that they are correctly checkpointed and committed on the next checkpoint + + long fetchedOffset = consumer.position(partition.getKafkaPartitionHandle()); + + LOG.info("Partition {} has no initial offset; the consumer has position {}, so the initial offset " + + "will be set to {}", partition.getKafkaPartitionHandle(), fetchedOffset, fetchedOffset - 1); + + // the fetched offset represents the next record to process, so we need to subtract it by 1 + partition.setOffset(fetchedOffset - 1); } } http://git-wip-us.apache.org/repos/asf/flink/blob/e4343ba0/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java index 3d347dc..d18e2a9 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java @@ -122,9 +122,8 @@ public class Kafka09ITCase extends KafkaConsumerTestBase { runStartFromKafkaCommitOffsets(); } - // TODO: This test will not pass until FLINK-4727 is resolved -// @Test(timeout = 60000) -// public void testAutoOffsetRetrievalAndCommitToKafka() throws Exception { -// runAutoOffsetRetrievalAndCommitToKafka(); -// } + @Test(timeout = 60000) + public void testAutoOffsetRetrievalAndCommitToKafka() throws Exception { + runAutoOffsetRetrievalAndCommitToKafka(); + } }