lucasbru commented on code in PR #14937:
URL: https://github.com/apache/kafka/pull/14937#discussion_r1418972399


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1050,6 +1062,81 @@ private void close(Duration timeout, boolean 
swallowException) {
         }
     }
 
+    /**
+     * Prior to closing the network thread, we need to make sure the following 
operations happen in the right sequence:
+     * 1. autocommit offsets
+     * 2. revoke all partitions
+     */
+    private void prepareShutdown(final Timer timer, final 
AtomicReference<Throwable> firstException) {
+        if (!groupMetadata.isPresent())
+            return;
+        maybeAutoCommitSync(timer, firstException);
+        timer.update();
+        waitOnEventCompletion(new 
ConsumerCloseApplicationEvent(ConsumerCloseApplicationEvent.Task.COMMIT, 
timer.remainingMs()), timer, firstException);
+        maybeInvokeCommitCallbacks();
+        maybeRevokePartitions(timer, firstException);
+        waitOnEventCompletion(new 
ConsumerCloseApplicationEvent(ConsumerCloseApplicationEvent.Task.LEAVE_GROUP, 
timer.remainingMs()), timer, firstException);
+    }
+
+    private void waitOnEventCompletion(final ConsumerCloseApplicationEvent 
event,
+                                       final Timer timer,
+                                       final AtomicReference<Throwable> 
firstException) {
+        try {
+            applicationEventHandler.addAndGet(event, timer);
+        } catch (TimeoutException e) {
+            log.debug("Timeout of {}ms expired before the {} operation could 
complete.",
+                timer.remainingMs(),
+                event.task());
+        } catch (Exception e) {
+            firstException.compareAndSet(null, e);
+        } finally {
+            timer.update();
+        }
+    }
+
+    private void maybeRevokePartitions(final Timer timer, final 
AtomicReference<Throwable> firstException) {
+        if (!subscriptions.hasAutoAssignedPartitions() || 
subscriptions.assignedPartitions().isEmpty())
+            return;
+        try {
+            // If the consumer is in a group, we will pause and revoke all 
assigned partitions
+            onLeavePrepare().get(timer.remainingMs(), TimeUnit.MILLISECONDS);
+        } catch (Exception e) {
+            Exception exception = e;
+            if (e instanceof ExecutionException)
+                exception = (Exception) e.getCause();
+            firstException.compareAndSet(null, exception);
+        } finally {
+            subscriptions.assignFromSubscribed(Collections.emptySet());
+            timer.update();
+        }
+    }
+
+    private void maybeAutoCommitSync(final Timer timer, final 
AtomicReference<Throwable> firstException) {
+        if (autoCommitEnabled) {
+            Map<TopicPartition, OffsetAndMetadata> allConsumed = 
subscriptions.allConsumed();
+            try {
+                log.debug("Sending synchronous auto-commit of offsets {} on 
closing", allConsumed);
+                commitSync(allConsumed, 
Duration.ofMillis(timer.remainingMs()));
+            } catch (TimeoutException e) {
+                log.debug("Timeout of {}ms expired before the auto commit 
could complete.",
+                    timer.remainingMs());
+            } catch (Exception e) {
+                // consistent with async auto-commit failures, we do not 
propagate the exception
+                log.warn("Synchronous auto-commit of offsets {} failed: {}", 
allConsumed, e.getMessage());
+                firstException.compareAndSet(null, e);
+            }
+        }
+    }
+
+    private CompletableFuture<Void> onLeavePrepare() {

Review Comment:
   not sure if that is the best name to describe what it does



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##########
@@ -223,23 +232,54 @@ private void process(final TopicMetadataApplicationEvent 
event) {
         event.chain(future);
     }
 
+    private void processPrepClosingEvent(ConsumerCloseApplicationEvent event) {
+        switch (event.task()) {
+            case COMMIT:
+                log.debug("Sending unsent commit before closing.");
+                sendUnsentCommit();
+                event.future().complete(null);

Review Comment:
   Yeah, as long as our timeout did not expire, we probably want to wait for 
the response, right?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##########
@@ -274,79 +269,18 @@ private void closeInternal(final Duration timeout) {
     }
 
     void cleanup() {
+        log.trace("Closing the consumer network thread");
+        Timer timer = time.timer(closeTimeout);
         try {
-            log.trace("Closing the consumer network thread");
-            Timer timer = time.timer(closeTimeout);
-            maybeAutocommitOnClose(timer);
             runAtClose(requestManagers.entries(), networkClientDelegate, 
timer);
-            maybeLeaveGroup(timer);
         } catch (Exception e) {
             log.error("Unexpected error during shutdown.  Proceed with 
closing.", e);
         } finally {
+            networkClientDelegate.awaitPendingRequests(timer);

Review Comment:
   Can there be other requests (not tied to the closing application events) 
that we want to wait for as long as we still have time?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -310,10 +310,11 @@ public void resetAutoCommitTimer() {
      */
     @Override
     public NetworkClientDelegate.PollResult pollOnClose() {
-        if (!pendingRequests.hasUnsentRequests() || 
!coordinatorRequestManager.coordinator().isPresent())
+        if (!pendingRequests.hasUnsentRequests())
             return EMPTY;
 
         List<NetworkClientDelegate.UnsentRequest> requests = 
pendingRequests.drainOnClose();
+        System.out.print("ddraining + " + requests);

Review Comment:
   remove



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##########
@@ -194,11 +189,11 @@ static void runAtClose(final Collection<Optional<? 
extends RequestManager>> requ
 
         // Poll to ensure that request has been written to the socket. Wait 
until either the timer has expired or until
         // all requests have received a response.
-        do {
+        while (timer.notExpired() && 
!requestFutures.stream().allMatch(Future::isDone)) {

Review Comment:
   Why are you changing this back?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to