This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch 2.6 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.6 by this push: new 74e6fe7 KAFKA-10134 Follow-up: Set the re-join flag in heartbeat failure (#9354) 74e6fe7 is described below commit 74e6fe715e35439f0c82f34eccdc45d2cd7e8227 Author: Guozhang Wang <wangg...@gmail.com> AuthorDate: Thu Oct 1 17:57:00 2020 -0700 KAFKA-10134 Follow-up: Set the re-join flag in heartbeat failure (#9354) Reviewers: A. Sophie Blee-Goldman <sop...@confluent.io>, Boyang Chen <boy...@confluent.io> --- .../consumer/internals/AbstractCoordinator.java | 2 +- .../kafka/clients/consumer/KafkaConsumerTest.java | 36 +++++++++++++--------- 2 files changed, 23 insertions(+), 15 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index a565c21..db35500 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -942,7 +942,7 @@ public abstract class AbstractCoordinator implements Closeable { synchronized void resetGenerationOnResponseError(ApiKeys api, Errors error) { log.debug("Resetting generation after encountering {} from {} response and requesting re-join", error, api); - resetState(); + resetStateAndRejoin(); } synchronized void resetGenerationOnLeaveGroup() { diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index 8cfbfd6..c5a5128 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -116,6 +116,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Pattern; @@ -1828,7 +1829,7 @@ public class KafkaConsumerTest { } @Test - public void testReturnRecordsDuringRebalance() { + public void testReturnRecordsDuringRebalance() throws InterruptedException { Time time = new MockTime(1L); SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); ConsumerMetadata metadata = createMetadata(subscription); @@ -1843,15 +1844,13 @@ public class KafkaConsumerTest { Node node = metadata.fetch().nodes().get(0); Node coordinator = prepareRebalance(client, node, assignor, Arrays.asList(tp0, t2p0), null); - // a first poll with zero millisecond would not complete the rebalance - consumer.poll(Duration.ZERO); + // a poll with non-zero milliseconds would complete three round-trips (discover, join, sync) + TestUtils.waitForCondition(() -> { + consumer.poll(Duration.ofMillis(100L)); + return consumer.assignment().equals(Utils.mkSet(tp0, t2p0)); + }, "Does not complete rebalance in time"); assertEquals(Utils.mkSet(topic, topic2), consumer.subscription()); - assertEquals(Collections.emptySet(), consumer.assignment()); - - // a second poll with non-zero milliseconds would complete three round-trips (discover, join, sync) - consumer.poll(Duration.ofMillis(100L)); - assertEquals(Utils.mkSet(tp0, t2p0), consumer.assignment()); // prepare a response of the outstanding fetch so that we have data available on the next poll @@ -1904,7 +1903,6 @@ public class KafkaConsumerTest { // mock rebalance responses client.respondFrom(joinGroupFollowerResponse(assignor, 2, "memberId", "leaderId", Errors.NONE), coordinator); - client.prepareResponseFrom(syncGroupResponse(Arrays.asList(tp0, t3p0), Errors.NONE), coordinator); // we need to poll 1) for getting the join response, and then send the sync request; // 2) for getting the sync response @@ -1920,12 +1918,19 @@ public class KafkaConsumerTest { fetches1.put(tp0, new FetchInfo(3, 1)); client.respondFrom(fetchResponse(fetches1), node); - records = consumer.poll(Duration.ZERO); + // now complete the rebalance + client.respondFrom(syncGroupResponse(Arrays.asList(tp0, t3p0), Errors.NONE), coordinator); + + AtomicInteger count = new AtomicInteger(0); + TestUtils.waitForCondition(() -> { + ConsumerRecords<String, String> recs = consumer.poll(Duration.ofMillis(100L)); + return consumer.assignment().equals(Utils.mkSet(tp0, t3p0)) && count.addAndGet(recs.count()) == 1; + + }, "Does not complete rebalance in time"); // should have t3 but not sent yet the t3 records assertEquals(Utils.mkSet(topic, topic3), consumer.subscription()); assertEquals(Utils.mkSet(tp0, t3p0), consumer.assignment()); - assertEquals(1, records.count()); assertEquals(4L, consumer.position(tp0)); assertEquals(0L, consumer.position(t3p0)); @@ -1934,10 +1939,13 @@ public class KafkaConsumerTest { fetches1.put(t3p0, new FetchInfo(0, 100)); client.respondFrom(fetchResponse(fetches1), node); - records = consumer.poll(Duration.ZERO); + count.set(0); + TestUtils.waitForCondition(() -> { + ConsumerRecords<String, String> recs = consumer.poll(Duration.ofMillis(100L)); + return count.addAndGet(recs.count()) == 101; + + }, "Does not complete rebalance in time"); - // should have t3 but not sent yet the t3 records - assertEquals(101, records.count()); assertEquals(5L, consumer.position(tp0)); assertEquals(100L, consumer.position(t3p0));