philipnee commented on code in PR #14710:
URL: https://github.com/apache/kafka/pull/14710#discussion_r1398696969


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##########
@@ -257,10 +255,62 @@ private void closeInternal(final Duration timeout) {
     void cleanup() {
         log.trace("Closing the consumer network thread");
         Timer timer = time.timer(closeTimeout);
+        coordinatorOnClose(timer);
         runAtClose(requestManagers.entries(), networkClientDelegate, timer);
         closeQuietly(requestManagers, "request managers");
         closeQuietly(networkClientDelegate, "network client delegate");
         closeQuietly(applicationEventProcessor, "application event processor");
         log.debug("Closed the consumer network thread");
     }
+
+    void coordinatorOnClose(final Timer timer) {
+        if (!requestManagers.coordinatorRequestManager.isPresent())
+            return;
+
+        connectCoordinator(timer);
+
+        List<NetworkClientDelegate.UnsentRequest> tasks = closingTasks();
+        do {
+            long currentTimeMs = timer.currentTimeMs();
+            connectCoordinator(timer);
+            networkClientDelegate.poll(timer.remainingMs(), currentTimeMs);
+        } while (timer.notExpired() && !tasks.stream().allMatch(v -> 
v.future().isDone()));
+    }
+
+    private void connectCoordinator(final Timer timer) {
+        while (!coordinatorReady()) {
+            findCoordinatorSync(timer);
+        }
+    }
+
+    private boolean coordinatorReady() {
+        CoordinatorRequestManager coordinatorRequestManager = 
requestManagers.coordinatorRequestManager.get();
+        Optional<Node> coordinator = coordinatorRequestManager.coordinator();
+        return coordinator.isPresent() && 
!networkClientDelegate.isUnavailable(coordinator.get());
+    }
+
+    private void findCoordinatorSync(final Timer timer) {
+        CoordinatorRequestManager coordinatorRequestManager = 
requestManagers.coordinatorRequestManager.get();
+        long currentTimeMs = timer.currentTimeMs();
+        NetworkClientDelegate.PollResult request = 
coordinatorRequestManager.pollOnClose();
+        networkClientDelegate.addAll(request);
+        CompletableFuture<ClientResponse> findCoordinatorRequest = 
request.unsentRequests.get(0).future();
+        while (timer.notExpired() && !findCoordinatorRequest.isDone()) {
+            networkClientDelegate.poll(timer.remainingMs(), currentTimeMs);
+            timer.update();
+        }
+    }
+
+    private List<NetworkClientDelegate.UnsentRequest> maybeAutoCommitOnClose() 
{

Review Comment:
   I kept it as list because I think it eliminates the need to check ifPresent 
in the closingTasks().  What it does it gather all of the requests that need to 
be sent during the shutdown from different request managers.  currently we only 
have 1, i.e. the commitRequestManager



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to