This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2047fc3715 HOTFIX: only try to clear discover-coordinator future upon 
commit (#12244)
2047fc3715 is described below

commit 2047fc3715002899996ba3e41d16551d2067edad
Author: Guozhang Wang <wangg...@gmail.com>
AuthorDate: Mon Jun 6 11:05:41 2022 -0700

    HOTFIX: only try to clear discover-coordinator future upon commit (#12244)
    
    This is another way of fixing KAFKA-13563 other than #11631.
    
    Instead of letting the consumer to always try to discover coordinator in 
pool with either mode (subscribe / assign), we defer the clearance of discover 
future upon committing async only. More specifically, under manual assign mode, 
there are only three places where we need the coordinator:
    
    * commitAsync (both by the consumer itself or triggered by caller), this is 
where we want to fix.
    * commitSync, which we already try to re-discovery coordinator.
    * committed (both by the consumer itself based on reset policy, or 
triggered by caller), which we already try to re-discovery coordinator.
    
    The benefits are that for manual assign mode that does not try to trigger 
any of the above three, then we never would be discovering coordinator. The 
original fix in #11631 would let the consumer to discover coordinator even if 
none of the above operations are required.
    
    Reviewers: Luke Chen <show...@gmail.com>, David Jacot <dja...@confluent.io>
---
 .../consumer/internals/ConsumerCoordinator.java    | 23 +++++++--
 .../kafka/clients/consumer/KafkaConsumerTest.java  |  1 -
 .../internals/ConsumerCoordinatorTest.java         | 57 ++++++++++++++++++++--
 3 files changed, 73 insertions(+), 8 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 51fa0b62ed..b853ff99e8 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.clients.consumer.internals;
 
+import java.time.Duration;
 import java.util.SortedSet;
 import java.util.TreeSet;
 import org.apache.kafka.clients.GroupRebalanceConfig;
@@ -548,14 +549,18 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
                 }
             }
         } else {
-            // For manually assigned partitions, if coordinator is unknown, 
make sure we lookup one and await metadata.
+            // For manually assigned partitions, we do not try to pro-actively 
lookup coordinator;
+            // instead we only try to refresh metadata when necessary.
             // If connections to all nodes fail, wakeups triggered while 
attempting to send fetch
             // requests result in polls returning immediately, causing a tight 
loop of polls. Without
             // the wakeup, poll() with no channels would block for the 
timeout, delaying re-connection.
             // awaitMetadataUpdate() in ensureCoordinatorReady initiates new 
connections with configured backoff and avoids the busy loop.
-            if (coordinatorUnknownAndUnready(timer)) {
-                return false;
+            if (metadata.updateRequested() && 
!client.hasReadyNodes(timer.currentTimeMs())) {
+                client.awaitMetadataUpdate(timer);
             }
+
+            // if there is pending coordinator requests, ensure they have a 
chance to be transmitted.
+            client.pollNoWakeup();
         }
 
         maybeAutoCommitOffsetsAsync(timer.currentTimeMs());
@@ -1004,7 +1009,17 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
         if (offsets.isEmpty()) {
             // No need to check coordinator if offsets is empty since commit 
of empty offsets is completed locally.
             future = doCommitOffsetsAsync(offsets, callback);
-        } else if (!coordinatorUnknown()) {
+        } else if (!coordinatorUnknownAndUnready(time.timer(Duration.ZERO))) {
+            // we need to make sure coordinator is ready before committing, 
since
+            // this is for async committing we do not try to block, but just 
try once to
+            // clear the previous discover-coordinator future, resend, or get 
responses;
+            // if the coordinator is not ready yet then we would just proceed 
and put that into the
+            // pending requests, and future poll calls would still try to 
complete them.
+            //
+            // the key here though is that we have to try sending the 
discover-coordinator if
+            // it's not known or ready, since this is the only place we can 
send such request
+            // under manual assignment (there we would not have heartbeat 
thread trying to auto-rediscover
+            // the coordinator).
             future = doCommitOffsetsAsync(offsets, callback);
         } else {
             // we don't know the current coordinator, so try to find it and 
then send the commit
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index ae54efc2a1..da3acf4983 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -717,7 +717,6 @@ public class KafkaConsumerTest {
         consumer.seekToEnd(singleton(tp0));
         consumer.seekToBeginning(singleton(tp1));
 
-        
client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node), node);
         client.prepareResponse(body -> {
             ListOffsetsRequest request = (ListOffsetsRequest) body;
             List<ListOffsetsPartition> partitions = 
request.topics().stream().flatMap(t -> {
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 1fee84a8d8..180053ee22 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -514,10 +514,62 @@ public abstract class ConsumerCoordinatorTest {
         coordinator.poll(time.timer(0));
         assertTrue(coordinator.coordinatorUnknown());
 
-        // should find an available node in next find coordinator request
+        // should not try to find coordinator since we are in manual assignment
+        // hence the prepared response should not be returned
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.poll(time.timer(Long.MAX_VALUE));
+        assertTrue(coordinator.coordinatorUnknown());
+    }
+
+    @Test
+    public void testAutoCommitAsyncWithUserAssignedType() {
+        try (ConsumerCoordinator coordinator = 
buildCoordinator(rebalanceConfig, new Metrics(), assignors, true, 
subscriptions)) {
+            subscriptions.assignFromUser(Collections.singleton(t1p));
+            // set timeout to 0 because we expect no requests sent
+            coordinator.poll(time.timer(0));
+            assertTrue(coordinator.coordinatorUnknown());
+            assertFalse(client.hasInFlightRequests());
+
+            // elapse auto commit interval and set committable position
+            time.sleep(autoCommitIntervalMs);
+            subscriptions.seekUnvalidated(t1p, new 
SubscriptionState.FetchPosition(100L));
+
+            // should try to find coordinator since we are auto committing
+            coordinator.poll(time.timer(0));
+            assertTrue(coordinator.coordinatorUnknown());
+            assertTrue(client.hasInFlightRequests());
+
+            client.respond(groupCoordinatorResponse(node, Errors.NONE));
+            coordinator.poll(time.timer(0));
+            assertFalse(coordinator.coordinatorUnknown());
+            // after we've discovered the coordinator we should send
+            // out the commit request immediately
+            assertTrue(client.hasInFlightRequests());
+        }
+    }
+
+    @Test
+    public void testCommitAsyncWithUserAssignedType() {
+        subscriptions.assignFromUser(Collections.singleton(t1p));
+        // set timeout to 0 because we expect no requests sent
+        coordinator.poll(time.timer(0));
+        assertTrue(coordinator.coordinatorUnknown());
+        assertFalse(client.hasInFlightRequests());
+
+        // should try to find coordinator since we are commit async
+        coordinator.commitOffsetsAsync(singletonMap(t1p, new 
OffsetAndMetadata(100L)), (offsets, exception) -> {
+            fail("Commit should not get responses, but got offsets:" + offsets 
+", and exception:" + exception);
+        });
+        coordinator.poll(time.timer(0));
+        assertTrue(coordinator.coordinatorUnknown());
+        assertTrue(client.hasInFlightRequests());
+
+        client.respond(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.poll(time.timer(0));
         assertFalse(coordinator.coordinatorUnknown());
+        // after we've discovered the coordinator we should send
+        // out the commit request immediately
+        assertTrue(client.hasInFlightRequests());
     }
 
     @Test
@@ -2116,8 +2168,7 @@ public abstract class ConsumerCoordinatorTest {
 
     @Test
     public void testAutoCommitDynamicAssignment() {
-        try (ConsumerCoordinator coordinator = 
buildCoordinator(rebalanceConfig, new Metrics(), assignors, true, subscriptions)
-        ) {
+        try (ConsumerCoordinator coordinator = 
buildCoordinator(rebalanceConfig, new Metrics(), assignors, true, 
subscriptions)) {
             subscriptions.subscribe(singleton(topic1), rebalanceListener);
             joinAsFollowerAndReceiveAssignment(coordinator, 
singletonList(t1p));
             subscriptions.seek(t1p, 100);

Reply via email to