Repository: kafka
Updated Branches:
  refs/heads/0.10.1 027455146 -> c310a1bc6


KAFKA-4547; Avoid unnecessary offset commit that could lead to an invalid 
offset position if partition is paused

Author: Vahid Hashemian <vahidhashem...@us.ibm.com>

Reviewers: Jason Gustafson <ja...@confluent.io>

Closes #2431 from vahidhashemian/KAFKA-4547-0.10.1


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c310a1bc
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c310a1bc
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c310a1bc

Branch: refs/heads/0.10.1
Commit: c310a1bc658bf4cea146e620f988ff19589bca14
Parents: 0274551
Author: Vahid Hashemian <vahidhashem...@us.ibm.com>
Authored: Tue Jan 24 15:30:07 2017 -0800
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Tue Jan 24 15:30:07 2017 -0800

----------------------------------------------------------------------
 .../kafka/clients/consumer/KafkaConsumer.java   |  6 +-
 .../clients/consumer/internals/Fetcher.java     |  2 +-
 .../consumer/internals/SubscriptionState.java   | 14 ++++-
 .../clients/consumer/KafkaConsumerTest.java     | 55 +++++++++++++++++
 .../clients/consumer/internals/FetcherTest.java | 62 ++++++++++++++++++++
 5 files changed, 132 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/c310a1bc/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index b384211..d309111 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -1523,9 +1523,9 @@ public class KafkaConsumer<K, V> implements Consumer<K, 
V> {
         // the user is manually assigning partitions and managing their own 
offsets).
         fetcher.resetOffsetsIfNeeded(partitions);
 
-        if (!subscriptions.hasAllFetchPositions()) {
-            // if we still don't have offsets for all partitions, then we 
should either seek
-            // to the last committed position or reset using the auto reset 
policy
+        if (!subscriptions.hasAllFetchPositions(partitions)) {
+            // if we still don't have offsets for the given partitions, then 
we should either
+            // seek to the last committed position or reset using the auto 
reset policy
 
             // first refresh commits for all assigned partitions
             coordinator.refreshCommittedOffsetsIfNeeded();

http://git-wip-us.apache.org/repos/asf/kafka/blob/c310a1bc/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index fdcfc30..3b8a81c 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -206,7 +206,7 @@ public class Fetcher<K, V> {
     public void updateFetchPositions(Set<TopicPartition> partitions) {
         // reset the fetch position to the committed position
         for (TopicPartition tp : partitions) {
-            if (!subscriptions.isAssigned(tp) || subscriptions.isFetchable(tp))
+            if (!subscriptions.isAssigned(tp) || 
subscriptions.hasValidPosition(tp))
                 continue;
 
             if (subscriptions.isOffsetResetNeeded(tp)) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/c310a1bc/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index 6dc2060..12830ab 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -341,13 +341,17 @@ public class SubscriptionState {
         return assignedState(partition).resetStrategy;
     }
 
-    public boolean hasAllFetchPositions() {
-        for (TopicPartitionState state : assignment.partitionStateValues())
-            if (!state.hasValidPosition())
+    public boolean hasAllFetchPositions(Collection<TopicPartition> partitions) 
{
+        for (TopicPartition partition : partitions)
+            if (!hasValidPosition(partition))
                 return false;
         return true;
     }
 
+    public boolean hasAllFetchPositions() {
+        return hasAllFetchPositions(this.assignedPartitions());
+    }
+
     public Set<TopicPartition> missingFetchPositions() {
         Set<TopicPartition> missing = new HashSet<>();
         for (PartitionStates.PartitionState<TopicPartitionState> state : 
assignment.partitionStates()) {
@@ -369,6 +373,10 @@ public class SubscriptionState {
         return isAssigned(tp) && assignedState(tp).isFetchable();
     }
 
+    public boolean hasValidPosition(TopicPartition tp) {
+        return isAssigned(tp) && assignedState(tp).hasValidPosition();
+    }
+
     public void pause(TopicPartition tp) {
         assignedState(tp).pause();
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c310a1bc/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index bf45ee6..05d48c5 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -56,6 +56,7 @@ import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.test.MockConsumerInterceptor;
 import org.apache.kafka.test.MockMetricsReporter;
 import org.apache.kafka.test.TestUtils;
@@ -1035,6 +1036,60 @@ public class KafkaConsumerTest {
         consumer.close();
     }
 
+    @Test
+    public void testOffsetOfPausedPartitions() {
+        int rebalanceTimeoutMs = 60000;
+        int sessionTimeoutMs = 30000;
+        int heartbeatIntervalMs = 3000;
+        int autoCommitIntervalMs = 1000;
+
+        Time time = new MockTime();
+        Cluster cluster = TestUtils.singletonCluster(topic, 2);
+        Node node = cluster.nodes().get(0);
+
+        Metadata metadata = new Metadata(0, Long.MAX_VALUE);
+        metadata.update(cluster, time.milliseconds());
+
+        MockClient client = new MockClient(time, metadata);
+        client.setNode(node);
+        PartitionAssignor assignor = new RangeAssignor();
+
+        final KafkaConsumer<String, String> consumer = newConsumer(time, 
client, metadata, assignor,
+                rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, 
true, autoCommitIntervalMs);
+
+        // lookup coordinator
+        client.prepareResponseFrom(new 
GroupCoordinatorResponse(Errors.NONE.code(), node).toStruct(), node);
+        Node coordinator = new Node(Integer.MAX_VALUE - node.id(), 
node.host(), node.port());
+
+        // manual assignment
+        Set<TopicPartition> partitions = Utils.mkSet(tp0, tp1);
+        consumer.assign(partitions);
+        // verify consumer's assignment
+        assertTrue(consumer.assignment().equals(partitions));
+
+        consumer.pause(partitions);
+        consumer.seekToEnd(partitions);
+
+        // fetch and verify committed offset of two partitions
+        Map<TopicPartition, Long> offsets = new HashMap<>();
+        offsets.put(tp0, 0L);
+        offsets.put(tp1, 0L);
+
+        client.prepareResponseFrom(offsetResponse(offsets, 
Errors.NONE.code()), coordinator);
+        assertEquals(0, consumer.committed(tp0).offset());
+        assertEquals(0, consumer.committed(tp1).offset());
+
+        // fetch and verify consumer's position in the two partitions
+        
client.prepareResponse(listOffsetsResponse(Collections.singletonMap(tp0, 3L), 
Errors.NONE.code()));
+        
client.prepareResponse(listOffsetsResponse(Collections.singletonMap(tp1, 3L), 
Errors.NONE.code()));
+        assertEquals(3L, consumer.position(tp0));
+        assertEquals(3L, consumer.position(tp1));
+
+        client.requests().clear();
+        consumer.unsubscribe();
+        consumer.close();
+    }
+
     @Test(expected = IllegalStateException.class)
     public void testPollWithNoSubscription() {
         KafkaConsumer<byte[], byte[]> consumer = newConsumer();

http://git-wip-us.apache.org/repos/asf/kafka/blob/c310a1bc/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 5822646..3e3a0e1 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -540,6 +540,68 @@ public class FetcherTest {
     }
 
     @Test
+    public void 
testUpdateFetchPositionOfPausedPartitionsRequiringOffsetReset() {
+        subscriptions.assignFromUser(singleton(tp));
+        subscriptions.committed(tp, new OffsetAndMetadata(0));
+        subscriptions.pause(tp); // paused partition does not have a valid 
position
+        subscriptions.needOffsetReset(tp, OffsetResetStrategy.LATEST);
+
+        
client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.LATEST_TIMESTAMP),
+                               listOffsetResponse(Errors.NONE, 1L, 10L));
+        fetcher.updateFetchPositions(singleton(tp));
+
+        assertFalse(subscriptions.isOffsetResetNeeded(tp));
+        assertFalse(subscriptions.isFetchable(tp)); // because tp is paused
+        assertTrue(subscriptions.hasValidPosition(tp));
+        assertEquals(10, subscriptions.position(tp).longValue());
+    }
+
+    @Test
+    public void 
testUpdateFetchPositionOfPausedPartitionsWithoutACommittedOffset() {
+        subscriptions.assignFromUser(singleton(tp));
+        subscriptions.pause(tp); // paused partition does not have a valid 
position
+
+        
client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.EARLIEST_TIMESTAMP),
+                               listOffsetResponse(Errors.NONE, 1L, 0L));
+        fetcher.updateFetchPositions(singleton(tp));
+
+        assertFalse(subscriptions.isOffsetResetNeeded(tp));
+        assertFalse(subscriptions.isFetchable(tp)); // because tp is paused
+        assertTrue(subscriptions.hasValidPosition(tp));
+        assertEquals(0, subscriptions.position(tp).longValue());
+    }
+
+    @Test
+    public void 
testUpdateFetchPositionOfPausedPartitionsWithoutAValidPosition() {
+        subscriptions.assignFromUser(singleton(tp));
+        subscriptions.committed(tp, new OffsetAndMetadata(0));
+        subscriptions.pause(tp); // paused partition does not have a valid 
position
+        subscriptions.seek(tp, 10);
+
+        fetcher.updateFetchPositions(singleton(tp));
+
+        assertFalse(subscriptions.isOffsetResetNeeded(tp));
+        assertFalse(subscriptions.isFetchable(tp)); // because tp is paused
+        assertTrue(subscriptions.hasValidPosition(tp));
+        assertEquals(10, subscriptions.position(tp).longValue());
+    }
+
+    @Test
+    public void testUpdateFetchPositionOfPausedPartitionsWithAValidPosition() {
+        subscriptions.assignFromUser(singleton(tp));
+        subscriptions.committed(tp, new OffsetAndMetadata(0));
+        subscriptions.seek(tp, 10);
+        subscriptions.pause(tp); // paused partition already has a valid 
position
+
+        fetcher.updateFetchPositions(singleton(tp));
+
+        assertFalse(subscriptions.isOffsetResetNeeded(tp));
+        assertFalse(subscriptions.isFetchable(tp)); // because tp is paused
+        assertTrue(subscriptions.hasValidPosition(tp));
+        assertEquals(10, subscriptions.position(tp).longValue());
+    }
+
+    @Test
     public void testGetAllTopics() {
         // sending response before request, as getTopicMetadata is a blocking 
call
         client.prepareResponse(newMetadataResponse(topicName, 
Errors.NONE).toStruct());

Reply via email to