This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.6 by this push:
     new 74e6fe7  KAFKA-10134 Follow-up: Set the re-join flag in heartbeat 
failure (#9354)
74e6fe7 is described below

commit 74e6fe715e35439f0c82f34eccdc45d2cd7e8227
Author: Guozhang Wang <wangg...@gmail.com>
AuthorDate: Thu Oct 1 17:57:00 2020 -0700

    KAFKA-10134 Follow-up: Set the re-join flag in heartbeat failure (#9354)
    
    Reviewers: A. Sophie Blee-Goldman <sop...@confluent.io>, Boyang Chen 
<boy...@confluent.io>
---
 .../consumer/internals/AbstractCoordinator.java    |  2 +-
 .../kafka/clients/consumer/KafkaConsumerTest.java  | 36 +++++++++++++---------
 2 files changed, 23 insertions(+), 15 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index a565c21..db35500 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -942,7 +942,7 @@ public abstract class AbstractCoordinator implements 
Closeable {
     synchronized void resetGenerationOnResponseError(ApiKeys api, Errors 
error) {
         log.debug("Resetting generation after encountering {} from {} response 
and requesting re-join", error, api);
 
-        resetState();
+        resetStateAndRejoin();
     }
 
     synchronized void resetGenerationOnLeaveGroup() {
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 8cfbfd6..c5a5128 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -116,6 +116,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.regex.Pattern;
 
@@ -1828,7 +1829,7 @@ public class KafkaConsumerTest {
     }
 
     @Test
-    public void testReturnRecordsDuringRebalance() {
+    public void testReturnRecordsDuringRebalance() throws InterruptedException 
{
         Time time = new MockTime(1L);
         SubscriptionState subscription = new SubscriptionState(new 
LogContext(), OffsetResetStrategy.EARLIEST);
         ConsumerMetadata metadata = createMetadata(subscription);
@@ -1843,15 +1844,13 @@ public class KafkaConsumerTest {
         Node node = metadata.fetch().nodes().get(0);
         Node coordinator = prepareRebalance(client, node, assignor, 
Arrays.asList(tp0, t2p0), null);
 
-        // a first poll with zero millisecond would not complete the rebalance
-        consumer.poll(Duration.ZERO);
+        // a poll with non-zero milliseconds would complete three round-trips 
(discover, join, sync)
+        TestUtils.waitForCondition(() -> {
+            consumer.poll(Duration.ofMillis(100L));
+            return consumer.assignment().equals(Utils.mkSet(tp0, t2p0));
+        }, "Does not complete rebalance in time");
 
         assertEquals(Utils.mkSet(topic, topic2), consumer.subscription());
-        assertEquals(Collections.emptySet(), consumer.assignment());
-
-        // a second poll with non-zero milliseconds would complete three 
round-trips (discover, join, sync)
-        consumer.poll(Duration.ofMillis(100L));
-
         assertEquals(Utils.mkSet(tp0, t2p0), consumer.assignment());
 
         // prepare a response of the outstanding fetch so that we have data 
available on the next poll
@@ -1904,7 +1903,6 @@ public class KafkaConsumerTest {
 
         // mock rebalance responses
         client.respondFrom(joinGroupFollowerResponse(assignor, 2, "memberId", 
"leaderId", Errors.NONE), coordinator);
-        client.prepareResponseFrom(syncGroupResponse(Arrays.asList(tp0, t3p0), 
Errors.NONE), coordinator);
 
         // we need to poll 1) for getting the join response, and then send the 
sync request;
         //                 2) for getting the sync response
@@ -1920,12 +1918,19 @@ public class KafkaConsumerTest {
         fetches1.put(tp0, new FetchInfo(3, 1));
         client.respondFrom(fetchResponse(fetches1), node);
 
-        records = consumer.poll(Duration.ZERO);
+        // now complete the rebalance
+        client.respondFrom(syncGroupResponse(Arrays.asList(tp0, t3p0), 
Errors.NONE), coordinator);
+
+        AtomicInteger count = new AtomicInteger(0);
+        TestUtils.waitForCondition(() -> {
+            ConsumerRecords<String, String> recs = 
consumer.poll(Duration.ofMillis(100L));
+            return consumer.assignment().equals(Utils.mkSet(tp0, t3p0)) && 
count.addAndGet(recs.count()) == 1;
+
+        }, "Does not complete rebalance in time");
 
         // should have t3 but not sent yet the t3 records
         assertEquals(Utils.mkSet(topic, topic3), consumer.subscription());
         assertEquals(Utils.mkSet(tp0, t3p0), consumer.assignment());
-        assertEquals(1, records.count());
         assertEquals(4L, consumer.position(tp0));
         assertEquals(0L, consumer.position(t3p0));
 
@@ -1934,10 +1939,13 @@ public class KafkaConsumerTest {
         fetches1.put(t3p0, new FetchInfo(0, 100));
         client.respondFrom(fetchResponse(fetches1), node);
 
-        records = consumer.poll(Duration.ZERO);
+        count.set(0);
+        TestUtils.waitForCondition(() -> {
+            ConsumerRecords<String, String> recs = 
consumer.poll(Duration.ofMillis(100L));
+            return count.addAndGet(recs.count()) == 101;
+
+        }, "Does not complete rebalance in time");
 
-        // should have t3 but not sent yet the t3 records
-        assertEquals(101, records.count());
         assertEquals(5L, consumer.position(tp0));
         assertEquals(100L, consumer.position(t3p0));
 

Reply via email to