philipnee commented on code in PR #14710:
URL: https://github.com/apache/kafka/pull/14710#discussion_r1398540947


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##########
@@ -257,10 +255,62 @@ private void closeInternal(final Duration timeout) {
     void cleanup() {
         log.trace("Closing the consumer network thread");
         Timer timer = time.timer(closeTimeout);
+        coordinatorOnClose(timer);
         runAtClose(requestManagers.entries(), networkClientDelegate, timer);
         closeQuietly(requestManagers, "request managers");
         closeQuietly(networkClientDelegate, "network client delegate");
         closeQuietly(applicationEventProcessor, "application event processor");
         log.debug("Closed the consumer network thread");
     }
+
+    void coordinatorOnClose(final Timer timer) {
+        if (!requestManagers.coordinatorRequestManager.isPresent())
+            return;
+
+        connectCoordinator(timer);

Review Comment:
   In case if the coordinator is disconnected, we need to first connect the 
coordinator in order to send the commits (and other tasks in the future).  The 
connectCoordinator() in the do { } while loop is to try to reconnect in case if 
the node is disconnected.  Similar the code in `ConsumerCoordinator` here:
   
   ```
   try {
               maybeAutoCommitOffsetsSync(timer);
               while (pendingAsyncCommits.get() > 0 && timer.notExpired()) {
                   ensureCoordinatorReady(timer);
                   client.poll(timer);
                   invokeCompletedOffsetCommitCallbacks();
               }
           } finally {
               super.close(timer);
           
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to