This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f7f376f6c16 KAFKA-12639: Exit upon expired timer to prevent tight 
looping (#13190)
f7f376f6c16 is described below

commit f7f376f6c162717e60e143b05fbd12ea2f347e3c
Author: Philip Nee <p...@confluent.io>
AuthorDate: Tue Feb 28 17:36:37 2023 -0800

    KAFKA-12639: Exit upon expired timer to prevent tight looping (#13190)
    
    In AbstractCoordinator#joinGroupIfNeeded - joinGroup request will be 
retried without proper backoff, due to the expired timer. This is an uncommon 
scenario and possibly only appears during the testing, but I think it makes 
sense to enforce the client to drive the join group via poll.
    
    Reviewers: Guozhang Wang <wangg...@gmail.com>
---
 .../consumer/internals/AbstractCoordinator.java    | 12 +++++--
 .../internals/AbstractCoordinatorTest.java         | 39 ++++++++++++++++++++++
 .../internals/ConsumerCoordinatorTest.java         |  3 +-
 3 files changed, 50 insertions(+), 4 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 558ce2b1169..fc01b14ab08 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -501,13 +501,19 @@ public abstract class AbstractCoordinator implements 
Closeable {
                 }
 
                 if (exception instanceof UnknownMemberIdException ||
-                    exception instanceof IllegalGenerationException ||
-                    exception instanceof RebalanceInProgressException ||
-                    exception instanceof MemberIdRequiredException)
+                        exception instanceof IllegalGenerationException ||
+                        exception instanceof RebalanceInProgressException ||
+                        exception instanceof MemberIdRequiredException)
                     continue;
                 else if (!future.isRetriable())
                     throw exception;
 
+                // We need to return upon expired timer, in case if the 
client.poll returns immediately and the time
+                // has elapsed.
+                if (timer.isExpired()) {
+                    return false;
+                }
+
                 timer.sleep(rebalanceConfig.retryBackoffMs);
             }
         }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
index 908dd7e4485..0a2b1e7ef8b 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
@@ -1561,6 +1561,45 @@ public class AbstractCoordinatorTest {
         }
     }
 
+    @Test
+    public void testBackoffAndRetryUponRetriableError() {
+        this.mockTime = new MockTime();
+        long currentTimeMs = System.currentTimeMillis();
+        this.mockTime.setCurrentTimeMs(System.currentTimeMillis());
+
+        setupCoordinator(); // note: uses 100ms backoff
+        mockClient.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE));
+        coordinator.ensureCoordinatorReady(mockTime.timer(0));
+
+        // Retriable Exception
+        
mockClient.prepareResponse(joinGroupResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS));
+        mockClient.prepareResponse(joinGroupResponse(Errors.NONE)); // Retry 
w/o error
+        mockClient.prepareResponse(syncGroupResponse(Errors.NONE));
+        coordinator.joinGroupIfNeeded(mockTime.timer(REQUEST_TIMEOUT_MS));
+
+        assertEquals(100, mockTime.milliseconds() - currentTimeMs, 1);
+    }
+
+    @Test
+    public void testReturnUponRetriableErrorAndExpiredTimer() throws 
InterruptedException {
+        setupCoordinator();
+        mockClient.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE));
+        coordinator.ensureCoordinatorReady(mockTime.timer(0));
+        ExecutorService executor = Executors.newFixedThreadPool(1);
+        Timer t = mockTime.timer(500);
+        try {
+            Future<Boolean> attempt = executor.submit(() -> 
coordinator.joinGroupIfNeeded(t));
+            mockTime.sleep(500);
+            
mockClient.prepareResponse(joinGroupResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS));
+            assertFalse(attempt.get());
+        } catch (Exception e) {
+            fail();
+        } finally {
+            executor.shutdownNow();
+            executor.awaitTermination(1000, TimeUnit.MILLISECONDS);
+        }
+    }
+
     private AtomicBoolean prepareFirstHeartbeat() {
         final AtomicBoolean heartbeatReceived = new AtomicBoolean(false);
         mockClient.prepareResponse(body -> {
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 36bf0ea6825..0f1256e1689 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -1484,7 +1484,8 @@ public abstract class ConsumerCoordinatorTest {
                 Utils.mkMap(Utils.mkEntry(topic1, 1), Utils.mkEntry(topic2, 
1))));
         client.respond(joinGroupFollowerResponse(1, consumerId, "leader", 
Errors.NOT_COORDINATOR));
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
-        coordinator.poll(time.timer(0));
+        assertFalse(client.hasInFlightRequests());
+        coordinator.poll(time.timer(1));
         assertTrue(coordinator.rejoinNeededOrPending());
 
         client.respond(request -> {

Reply via email to