lucasbru commented on code in PR #14710:
URL: https://github.com/apache/kafka/pull/14710#discussion_r1397321134


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -180,6 +180,18 @@ public void maybeAutoCommit(final Map<TopicPartition, 
OffsetAndMetadata> offsets
         autocommit.setInflightCommitStatus(true);
     }
 
+    /**
+     * The consumer needs to send an auto commit during the shutdown
+     */
+    public List<NetworkClientDelegate.UnsentRequest> maybeAutoCommitOnClose() {

Review Comment:
   Can this be package-private?
   
   Could this return an optional instead of a list?
   
   I'd drop the "onClose" because that doesn't seem to matter here. The method 
does a `maybeAutoCommit`. The fact that it's executed on close belongs to the 
calling context no?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManager.java:
##########
@@ -35,7 +35,7 @@ public interface RequestManager {
      * synchronization protection in this method's implementation.
      *
      * <p/>
-     *
+     *Close(

Review Comment:
   whoops



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -561,23 +583,27 @@ boolean hasUnsentRequests() {
 
         OffsetCommitRequestState addOffsetCommitRequest(final 
Map<TopicPartition, OffsetAndMetadata> offsets) {

Review Comment:
   nit: put overloads next to each other in the file (here you put 
`createOffsetCommitRequest` in between the two overloads for 
`addOffsetCommitRequest`)



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -231,6 +243,17 @@ private void handleCoordinatorDisconnect(Throwable 
exception, long currentTimeMs
         }
     }
 
+    @Override
+    public NetworkClientDelegate.PollResult pollOnClose() {
+        if (!pendingRequests.hasUnsentRequests() || 
!coordinatorRequestManager.coordinator().isPresent())
+            return EMPTY;
+
+        sendAutoCommit(subscriptions.allConsumed());

Review Comment:
   We seem to have two code paths here dealing with the auto commit and I 
haven't quite grasped what is the idea here. For example, we seem to be 
creating the auto-commit request twice during close:
   
   ```
   ConsumerNetworkThread.coordinatorOnClose -> 
ConsumerNetworkThread.closingTasks -> 
ConsumerNetworkThread.maybeAutoCommitOnClose -> 
CommitRequestManager.maybeAutoCommitOnClose -> 
CommitRequestManager.createOffsetCommitRequest
   ```
   
   As well as:
   
   ```
   ConsumerNetworkThread.runAtClose -> CommitRequestManager.pollOnClose -> 
CommitRequestManager.sendAutoCommit -> 
CommitRequestManager.addOffsetCommitRequest -> 
CommitRequestManager.creatOffsetCommitRequest
   ```
   
   Could you add a bit of context to the PR so that it's easier for me to 
understand what is the thinking behind the shutdown procedure?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##########
@@ -257,10 +255,62 @@ private void closeInternal(final Duration timeout) {
     void cleanup() {
         log.trace("Closing the consumer network thread");
         Timer timer = time.timer(closeTimeout);
+        coordinatorOnClose(timer);
         runAtClose(requestManagers.entries(), networkClientDelegate, timer);
         closeQuietly(requestManagers, "request managers");
         closeQuietly(networkClientDelegate, "network client delegate");
         closeQuietly(applicationEventProcessor, "application event processor");
         log.debug("Closed the consumer network thread");
     }
+
+    void coordinatorOnClose(final Timer timer) {
+        if (!requestManagers.coordinatorRequestManager.isPresent())
+            return;
+
+        connectCoordinator(timer);
+
+        List<NetworkClientDelegate.UnsentRequest> tasks = closingTasks();
+        do {
+            long currentTimeMs = timer.currentTimeMs();
+            connectCoordinator(timer);
+            networkClientDelegate.poll(timer.remainingMs(), currentTimeMs);
+        } while (timer.notExpired() && !tasks.stream().allMatch(v -> 
v.future().isDone()));
+    }
+
+    private void connectCoordinator(final Timer timer) {
+        while (!coordinatorReady()) {
+            findCoordinatorSync(timer);
+        }
+    }
+
+    private boolean coordinatorReady() {
+        CoordinatorRequestManager coordinatorRequestManager = 
requestManagers.coordinatorRequestManager.get();
+        Optional<Node> coordinator = coordinatorRequestManager.coordinator();
+        return coordinator.isPresent() && 
!networkClientDelegate.isUnavailable(coordinator.get());
+    }
+
+    private void findCoordinatorSync(final Timer timer) {
+        CoordinatorRequestManager coordinatorRequestManager = 
requestManagers.coordinatorRequestManager.get();
+        long currentTimeMs = timer.currentTimeMs();
+        NetworkClientDelegate.PollResult request = 
coordinatorRequestManager.pollOnClose();
+        networkClientDelegate.addAll(request);
+        CompletableFuture<ClientResponse> findCoordinatorRequest = 
request.unsentRequests.get(0).future();
+        while (timer.notExpired() && !findCoordinatorRequest.isDone()) {
+            networkClientDelegate.poll(timer.remainingMs(), currentTimeMs);

Review Comment:
   this will probably work, but shouldn't we use `timer.currentTimeMs` here?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##########
@@ -257,10 +255,62 @@ private void closeInternal(final Duration timeout) {
     void cleanup() {
         log.trace("Closing the consumer network thread");
         Timer timer = time.timer(closeTimeout);
+        coordinatorOnClose(timer);
         runAtClose(requestManagers.entries(), networkClientDelegate, timer);
         closeQuietly(requestManagers, "request managers");
         closeQuietly(networkClientDelegate, "network client delegate");
         closeQuietly(applicationEventProcessor, "application event processor");
         log.debug("Closed the consumer network thread");
     }
+
+    void coordinatorOnClose(final Timer timer) {
+        if (!requestManagers.coordinatorRequestManager.isPresent())
+            return;
+
+        connectCoordinator(timer);
+
+        List<NetworkClientDelegate.UnsentRequest> tasks = closingTasks();
+        do {
+            long currentTimeMs = timer.currentTimeMs();
+            connectCoordinator(timer);
+            networkClientDelegate.poll(timer.remainingMs(), currentTimeMs);
+        } while (timer.notExpired() && !tasks.stream().allMatch(v -> 
v.future().isDone()));
+    }
+
+    private void connectCoordinator(final Timer timer) {
+        while (!coordinatorReady()) {
+            findCoordinatorSync(timer);
+        }
+    }
+
+    private boolean coordinatorReady() {
+        CoordinatorRequestManager coordinatorRequestManager = 
requestManagers.coordinatorRequestManager.get();
+        Optional<Node> coordinator = coordinatorRequestManager.coordinator();
+        return coordinator.isPresent() && 
!networkClientDelegate.isUnavailable(coordinator.get());
+    }
+
+    private void findCoordinatorSync(final Timer timer) {
+        CoordinatorRequestManager coordinatorRequestManager = 
requestManagers.coordinatorRequestManager.get();
+        long currentTimeMs = timer.currentTimeMs();
+        NetworkClientDelegate.PollResult request = 
coordinatorRequestManager.pollOnClose();
+        networkClientDelegate.addAll(request);
+        CompletableFuture<ClientResponse> findCoordinatorRequest = 
request.unsentRequests.get(0).future();
+        while (timer.notExpired() && !findCoordinatorRequest.isDone()) {
+            networkClientDelegate.poll(timer.remainingMs(), currentTimeMs);
+            timer.update();
+        }
+    }
+
+    private List<NetworkClientDelegate.UnsentRequest> maybeAutoCommitOnClose() 
{

Review Comment:
   Could also return an optional



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##########
@@ -257,10 +255,62 @@ private void closeInternal(final Duration timeout) {
     void cleanup() {
         log.trace("Closing the consumer network thread");
         Timer timer = time.timer(closeTimeout);
+        coordinatorOnClose(timer);
         runAtClose(requestManagers.entries(), networkClientDelegate, timer);
         closeQuietly(requestManagers, "request managers");
         closeQuietly(networkClientDelegate, "network client delegate");
         closeQuietly(applicationEventProcessor, "application event processor");
         log.debug("Closed the consumer network thread");
     }
+
+    void coordinatorOnClose(final Timer timer) {

Review Comment:
   Try to use a verb in the method name. You'd think this method returns the 
coordinator, or something.
   
   I'm not really sure what this does given the name. Maybe wait for closing 
tasks? Then call it `waitForClosingTasks`.
   
   also, `// visible for testing`



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##########
@@ -257,10 +255,62 @@ private void closeInternal(final Duration timeout) {
     void cleanup() {
         log.trace("Closing the consumer network thread");
         Timer timer = time.timer(closeTimeout);
+        coordinatorOnClose(timer);
         runAtClose(requestManagers.entries(), networkClientDelegate, timer);
         closeQuietly(requestManagers, "request managers");
         closeQuietly(networkClientDelegate, "network client delegate");
         closeQuietly(applicationEventProcessor, "application event processor");
         log.debug("Closed the consumer network thread");
     }
+
+    void coordinatorOnClose(final Timer timer) {
+        if (!requestManagers.coordinatorRequestManager.isPresent())
+            return;
+
+        connectCoordinator(timer);

Review Comment:
   Do I need this here? I will call `connectCoordinator` 3 lines below again.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to