Repository: flink
Updated Branches:
  refs/heads/master d95929e01 -> e4343ba0d


[FLINK-4727] [kafka] Set missing initial offset states with starting 
KafkaConsumer position

With this change, on a clean startup of FlinkKafkaConsumer09, the auto 
retrieved offsets (either earliest, latest, or an actual
committed offset) from Kafka will also be checkpointed and committed, even if 
no records are read after the startup.

This closes #2585


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e4343ba0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e4343ba0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e4343ba0

Branch: refs/heads/master
Commit: e4343ba0de24897c3cf62891429d5faf82043242
Parents: d95929e
Author: Tzu-Li (Gordon) Tai <tzuli...@apache.org>
Authored: Tue Oct 4 16:53:38 2016 +0800
Committer: Tzu-Li (Gordon) Tai <tzuli...@apache.org>
Committed: Wed Oct 19 09:53:00 2016 +0800

----------------------------------------------------------------------
 .../streaming/connectors/kafka/Kafka010ITCase.java   |  9 ++++-----
 .../connectors/kafka/internal/Kafka09Fetcher.java    | 15 +++++++++++++++
 .../streaming/connectors/kafka/Kafka09ITCase.java    |  9 ++++-----
 3 files changed, 23 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e4343ba0/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
index 77407ff..08511c9 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
@@ -143,11 +143,10 @@ public class Kafka010ITCase extends KafkaConsumerTestBase 
{
                runStartFromKafkaCommitOffsets();
        }
 
-       // TODO: This test will not pass until FLINK-4727 is resolved
-//     @Test(timeout = 60000)
-//     public void testAutoOffsetRetrievalAndCommitToKafka() throws Exception {
-//             runAutoOffsetRetrievalAndCommitToKafka();
-//     }
+       @Test(timeout = 60000)
+       public void testAutoOffsetRetrievalAndCommitToKafka() throws Exception {
+               runAutoOffsetRetrievalAndCommitToKafka();
+       }
 
        /**
         * Kafka 0.10 specific test, ensuring Timestamps are properly written 
to and read from Kafka

http://git-wip-us.apache.org/repos/asf/flink/blob/e4343ba0/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
index 3a3d3de..af3b199 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
@@ -205,7 +205,22 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, 
TopicPartition> implem
                        // seek the consumer to the initial offsets
                        for (KafkaTopicPartitionState<TopicPartition> partition 
: subscribedPartitions()) {
                                if (partition.isOffsetDefined()) {
+                                       LOG.info("Partition {} has restored 
initial offsets {} from checkpoint / savepoint; seeking the consumer " +
+                                               "to position {}", 
partition.getKafkaPartitionHandle(), partition.getOffset(), 
partition.getOffset() + 1);
+
                                        
consumer.seek(partition.getKafkaPartitionHandle(), partition.getOffset() + 1);
+                               } else {
+                                       // for partitions that do not have 
offsets restored from a checkpoint/savepoint,
+                                       // we need to define our internal 
offset state for them using the initial offsets retrieved from Kafka
+                                       // by the KafkaConsumer, so that they 
are correctly checkpointed and committed on the next checkpoint
+
+                                       long fetchedOffset = 
consumer.position(partition.getKafkaPartitionHandle());
+
+                                       LOG.info("Partition {} has no initial 
offset; the consumer has position {}, so the initial offset " +
+                                               "will be set to {}", 
partition.getKafkaPartitionHandle(), fetchedOffset, fetchedOffset - 1);
+
+                                       // the fetched offset represents the 
next record to process, so we need to subtract it by 1
+                                       partition.setOffset(fetchedOffset - 1);
                                }
                        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e4343ba0/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
index 3d347dc..d18e2a9 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09ITCase.java
@@ -122,9 +122,8 @@ public class Kafka09ITCase extends KafkaConsumerTestBase {
                runStartFromKafkaCommitOffsets();
        }
 
-       // TODO: This test will not pass until FLINK-4727 is resolved
-//     @Test(timeout = 60000)
-//     public void testAutoOffsetRetrievalAndCommitToKafka() throws Exception {
-//             runAutoOffsetRetrievalAndCommitToKafka();
-//     }
+       @Test(timeout = 60000)
+       public void testAutoOffsetRetrievalAndCommitToKafka() throws Exception {
+               runAutoOffsetRetrievalAndCommitToKafka();
+       }
 }

Reply via email to