lucasbru commented on code in PR #14937:
URL: https://github.com/apache/kafka/pull/14937#discussion_r1424029239


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1026,15 +1038,13 @@ private void close(Duration timeout, boolean 
swallowException) {
         final Timer closeTimer = time.timer(timeout);
         clientTelemetryReporter.ifPresent(reporter -> 
reporter.initiateClose(timeout.toMillis()));
         closeTimer.update();
-
+        // Prepare shutting down the network thread
+        prepareShutdown(closeTimer, firstException);
+        closeTimer.update();
         if (applicationEventHandler != null)
-            closeQuietly(() -> 
applicationEventHandler.close(Duration.ofMillis(closeTimer.remainingMs())), 
"Failed to close application event handler with a timeout(ms)=" + 
closeTimer.remainingMs(), firstException);
-
-        // Invoke all callbacks after the background thread exists in case if 
there are unsent async
-        // commits
-        maybeInvokeCommitCallbacks();
-
-        closeQuietly(fetchBuffer, "Failed to close the fetch buffer", 
firstException);
+            closeQuietly(() -> 
applicationEventHandler.close(Duration.ofMillis(closeTimer.remainingMs())), 
"Failed shutting down network thread", firstException);
+        closeTimer.update();
+        // Ensure all async commit callbacks are invoked

Review Comment:
   misplaced comment



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1050,6 +1060,85 @@ private void close(Duration timeout, boolean 
swallowException) {
         }
     }
 
+    /**
+     * Prior to closing the network thread, we need to make sure the following 
operations happen in the right sequence:
+     * 1. autocommit offsets
+     * 2. revoke all partitions
+     */
+    void prepareShutdown(final Timer timer, final AtomicReference<Throwable> 
firstException) {
+        if (!groupMetadata.isPresent())
+            return;
+        maybeAutoCommitSync(autoCommitEnabled, timer, firstException);
+        timer.update();
+        applicationEventHandler.add(new CommitOnCloseApplicationEvent());
+        maybeRevokePartitions(timer, firstException);
+        waitOnCompletion(
+            () -> applicationEventHandler.addAndGet(new 
LeaveOnCloseApplicationEvent(), timer),
+            "leave group on close", timer, firstException);
+        maybeInvokeCommitCallbacks();
+    }
+
+    // Visible for testing
+    void maybeRevokePartitions(final Timer timer, final 
AtomicReference<Throwable> firstException) {
+        if (!subscriptions.hasAutoAssignedPartitions() || 
subscriptions.assignedPartitions().isEmpty())
+            return;
+        // TODO: We obviously needs to block until the partition revocation 
completes.
+        waitOnCompletion(this::invokePartitionRevocationListener, "revoke 
partitions", timer, firstException);
+        subscriptions.assignFromSubscribed(Collections.emptySet());
+    }
+
+    // Visible for testing
+    void maybeAutoCommitSync(final boolean shouldAutoCommit,
+                             final Timer timer,
+                             final AtomicReference<Throwable> firstException) {
+        if (!shouldAutoCommit)
+            return;
+        waitOnCompletion(() -> {
+            Map<TopicPartition, OffsetAndMetadata> allConsumed = 
subscriptions.allConsumed();
+            log.debug("Sending synchronous auto-commit of offsets {} on 
closing", allConsumed);
+            commitSync(allConsumed, Duration.ofMillis(timer.remainingMs()));
+        }, "autoCommitSync", timer, firstException);
+    }
+
+    // Visible for testing
+    void waitOnCompletion(final Runnable function,
+                                  final String taskName,
+                                  final Timer timer,
+                                  final AtomicReference<Throwable> 
firstException) {
+        try {
+            function.run();
+        } catch (Exception e) {
+            handleException(e, taskName, timer, Optional.of(firstException));
+        } finally {
+            timer.update();
+        }
+    }
+
+    private void handleException(final Exception e,
+                                 final String taskName,
+                                 final Timer timer,
+                                 final Optional<AtomicReference<Throwable>> 
firstException) {
+        if (e instanceof TimeoutException) {
+            log.debug("Timeout of {}ms expired before the {} operation could 
complete.", timer.remainingMs(), taskName);
+        } else {
+            Exception exception = e;
+            if (e instanceof ExecutionException)
+                exception = (Exception) e.getCause();
+            if (!firstException.isPresent())
+                log.debug("Failed to execute {} operation due to {}", 
taskName, exception.getMessage());
+            firstException.get().compareAndSet(null, exception);
+        }
+    }
+
+    private CompletableFuture<Void> invokePartitionRevocationListener() {
+        SortedSet<TopicPartition> droppedPartitions = new 
TreeSet<>(MembershipManagerImpl.TOPIC_PARTITION_COMPARATOR);
+        droppedPartitions.addAll(subscriptions.assignedPartitions());
+        if (!subscriptions.hasAutoAssignedPartitions() || 
droppedPartitions.isEmpty())
+            return CompletableFuture.completedFuture(null);
+        // TODO: Invoke rebalanceListener via KAFKA-15276

Review Comment:
   Can we merge it without resolving this comment?



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java:
##########
@@ -315,7 +315,8 @@ public void testFetcherCloseClosesFetchSessionsInBroker() {
         // the close() method with a Timer will NOT send out the close session 
requests on close. The network
         // I/O logic is handled inside ConsumerNetworkThread.runAtClose, so we 
need to run that logic here.
         ConsumerNetworkThread.runAtClose(singletonList(Optional.of(fetcher)), 
networkClientDelegate, timer);
-
+        // the network is polled during the last state of clean up.
+        networkClientDelegate.poll(time.timer(1));

Review Comment:
   Does kirk need to confirm this change before we can merge the PR?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##########
@@ -178,27 +171,11 @@ static void runAtClose(final Collection<Optional<? 
extends RequestManager>> requ
                            final NetworkClientDelegate networkClientDelegate,
                            final Timer timer) {
         // These are the optional outgoing requests at the
-        List<NetworkClientDelegate.PollResult> pollResults = 
requestManagers.stream()
+        requestManagers.stream()

Review Comment:
   Can we merge it without resolving this comment?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to