lucasbru commented on code in PR #14937: URL: https://github.com/apache/kafka/pull/14937#discussion_r1424029239
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ########## @@ -1026,15 +1038,13 @@ private void close(Duration timeout, boolean swallowException) { final Timer closeTimer = time.timer(timeout); clientTelemetryReporter.ifPresent(reporter -> reporter.initiateClose(timeout.toMillis())); closeTimer.update(); - + // Prepare shutting down the network thread + prepareShutdown(closeTimer, firstException); + closeTimer.update(); if (applicationEventHandler != null) - closeQuietly(() -> applicationEventHandler.close(Duration.ofMillis(closeTimer.remainingMs())), "Failed to close application event handler with a timeout(ms)=" + closeTimer.remainingMs(), firstException); - - // Invoke all callbacks after the background thread exists in case if there are unsent async - // commits - maybeInvokeCommitCallbacks(); - - closeQuietly(fetchBuffer, "Failed to close the fetch buffer", firstException); + closeQuietly(() -> applicationEventHandler.close(Duration.ofMillis(closeTimer.remainingMs())), "Failed shutting down network thread", firstException); + closeTimer.update(); + // Ensure all async commit callbacks are invoked Review Comment: misplaced comment ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ########## @@ -1050,6 +1060,85 @@ private void close(Duration timeout, boolean swallowException) { } } + /** + * Prior to closing the network thread, we need to make sure the following operations happen in the right sequence: + * 1. autocommit offsets + * 2. revoke all partitions + */ + void prepareShutdown(final Timer timer, final AtomicReference<Throwable> firstException) { + if (!groupMetadata.isPresent()) + return; + maybeAutoCommitSync(autoCommitEnabled, timer, firstException); + timer.update(); + applicationEventHandler.add(new CommitOnCloseApplicationEvent()); + maybeRevokePartitions(timer, firstException); + waitOnCompletion( + () -> applicationEventHandler.addAndGet(new LeaveOnCloseApplicationEvent(), timer), + "leave group on close", timer, firstException); + maybeInvokeCommitCallbacks(); + } + + // Visible for testing + void maybeRevokePartitions(final Timer timer, final AtomicReference<Throwable> firstException) { + if (!subscriptions.hasAutoAssignedPartitions() || subscriptions.assignedPartitions().isEmpty()) + return; + // TODO: We obviously needs to block until the partition revocation completes. + waitOnCompletion(this::invokePartitionRevocationListener, "revoke partitions", timer, firstException); + subscriptions.assignFromSubscribed(Collections.emptySet()); + } + + // Visible for testing + void maybeAutoCommitSync(final boolean shouldAutoCommit, + final Timer timer, + final AtomicReference<Throwable> firstException) { + if (!shouldAutoCommit) + return; + waitOnCompletion(() -> { + Map<TopicPartition, OffsetAndMetadata> allConsumed = subscriptions.allConsumed(); + log.debug("Sending synchronous auto-commit of offsets {} on closing", allConsumed); + commitSync(allConsumed, Duration.ofMillis(timer.remainingMs())); + }, "autoCommitSync", timer, firstException); + } + + // Visible for testing + void waitOnCompletion(final Runnable function, + final String taskName, + final Timer timer, + final AtomicReference<Throwable> firstException) { + try { + function.run(); + } catch (Exception e) { + handleException(e, taskName, timer, Optional.of(firstException)); + } finally { + timer.update(); + } + } + + private void handleException(final Exception e, + final String taskName, + final Timer timer, + final Optional<AtomicReference<Throwable>> firstException) { + if (e instanceof TimeoutException) { + log.debug("Timeout of {}ms expired before the {} operation could complete.", timer.remainingMs(), taskName); + } else { + Exception exception = e; + if (e instanceof ExecutionException) + exception = (Exception) e.getCause(); + if (!firstException.isPresent()) + log.debug("Failed to execute {} operation due to {}", taskName, exception.getMessage()); + firstException.get().compareAndSet(null, exception); + } + } + + private CompletableFuture<Void> invokePartitionRevocationListener() { + SortedSet<TopicPartition> droppedPartitions = new TreeSet<>(MembershipManagerImpl.TOPIC_PARTITION_COMPARATOR); + droppedPartitions.addAll(subscriptions.assignedPartitions()); + if (!subscriptions.hasAutoAssignedPartitions() || droppedPartitions.isEmpty()) + return CompletableFuture.completedFuture(null); + // TODO: Invoke rebalanceListener via KAFKA-15276 Review Comment: Can we merge it without resolving this comment? ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java: ########## @@ -315,7 +315,8 @@ public void testFetcherCloseClosesFetchSessionsInBroker() { // the close() method with a Timer will NOT send out the close session requests on close. The network // I/O logic is handled inside ConsumerNetworkThread.runAtClose, so we need to run that logic here. ConsumerNetworkThread.runAtClose(singletonList(Optional.of(fetcher)), networkClientDelegate, timer); - + // the network is polled during the last state of clean up. + networkClientDelegate.poll(time.timer(1)); Review Comment: Does kirk need to confirm this change before we can merge the PR? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java: ########## @@ -178,27 +171,11 @@ static void runAtClose(final Collection<Optional<? extends RequestManager>> requ final NetworkClientDelegate networkClientDelegate, final Timer timer) { // These are the optional outgoing requests at the - List<NetworkClientDelegate.PollResult> pollResults = requestManagers.stream() + requestManagers.stream() Review Comment: Can we merge it without resolving this comment? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org