lucasbru commented on code in PR #14937: URL: https://github.com/apache/kafka/pull/14937#discussion_r1418972399
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ########## @@ -1050,6 +1062,81 @@ private void close(Duration timeout, boolean swallowException) { } } + /** + * Prior to closing the network thread, we need to make sure the following operations happen in the right sequence: + * 1. autocommit offsets + * 2. revoke all partitions + */ + private void prepareShutdown(final Timer timer, final AtomicReference<Throwable> firstException) { + if (!groupMetadata.isPresent()) + return; + maybeAutoCommitSync(timer, firstException); + timer.update(); + waitOnEventCompletion(new ConsumerCloseApplicationEvent(ConsumerCloseApplicationEvent.Task.COMMIT, timer.remainingMs()), timer, firstException); + maybeInvokeCommitCallbacks(); + maybeRevokePartitions(timer, firstException); + waitOnEventCompletion(new ConsumerCloseApplicationEvent(ConsumerCloseApplicationEvent.Task.LEAVE_GROUP, timer.remainingMs()), timer, firstException); + } + + private void waitOnEventCompletion(final ConsumerCloseApplicationEvent event, + final Timer timer, + final AtomicReference<Throwable> firstException) { + try { + applicationEventHandler.addAndGet(event, timer); + } catch (TimeoutException e) { + log.debug("Timeout of {}ms expired before the {} operation could complete.", + timer.remainingMs(), + event.task()); + } catch (Exception e) { + firstException.compareAndSet(null, e); + } finally { + timer.update(); + } + } + + private void maybeRevokePartitions(final Timer timer, final AtomicReference<Throwable> firstException) { + if (!subscriptions.hasAutoAssignedPartitions() || subscriptions.assignedPartitions().isEmpty()) + return; + try { + // If the consumer is in a group, we will pause and revoke all assigned partitions + onLeavePrepare().get(timer.remainingMs(), TimeUnit.MILLISECONDS); + } catch (Exception e) { + Exception exception = e; + if (e instanceof ExecutionException) + exception = (Exception) e.getCause(); + firstException.compareAndSet(null, exception); + } finally { + subscriptions.assignFromSubscribed(Collections.emptySet()); + timer.update(); + } + } + + private void maybeAutoCommitSync(final Timer timer, final AtomicReference<Throwable> firstException) { + if (autoCommitEnabled) { + Map<TopicPartition, OffsetAndMetadata> allConsumed = subscriptions.allConsumed(); + try { + log.debug("Sending synchronous auto-commit of offsets {} on closing", allConsumed); + commitSync(allConsumed, Duration.ofMillis(timer.remainingMs())); + } catch (TimeoutException e) { + log.debug("Timeout of {}ms expired before the auto commit could complete.", + timer.remainingMs()); + } catch (Exception e) { + // consistent with async auto-commit failures, we do not propagate the exception + log.warn("Synchronous auto-commit of offsets {} failed: {}", allConsumed, e.getMessage()); + firstException.compareAndSet(null, e); + } + } + } + + private CompletableFuture<Void> onLeavePrepare() { Review Comment: not sure if that is the best name to describe what it does ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java: ########## @@ -223,23 +232,54 @@ private void process(final TopicMetadataApplicationEvent event) { event.chain(future); } + private void processPrepClosingEvent(ConsumerCloseApplicationEvent event) { + switch (event.task()) { + case COMMIT: + log.debug("Sending unsent commit before closing."); + sendUnsentCommit(); + event.future().complete(null); Review Comment: Yeah, as long as our timeout did not expire, we probably want to wait for the response, right? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java: ########## @@ -274,79 +269,18 @@ private void closeInternal(final Duration timeout) { } void cleanup() { + log.trace("Closing the consumer network thread"); + Timer timer = time.timer(closeTimeout); try { - log.trace("Closing the consumer network thread"); - Timer timer = time.timer(closeTimeout); - maybeAutocommitOnClose(timer); runAtClose(requestManagers.entries(), networkClientDelegate, timer); - maybeLeaveGroup(timer); } catch (Exception e) { log.error("Unexpected error during shutdown. Proceed with closing.", e); } finally { + networkClientDelegate.awaitPendingRequests(timer); Review Comment: Can there be other requests (not tied to the closing application events) that we want to wait for as long as we still have time? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ########## @@ -310,10 +310,11 @@ public void resetAutoCommitTimer() { */ @Override public NetworkClientDelegate.PollResult pollOnClose() { - if (!pendingRequests.hasUnsentRequests() || !coordinatorRequestManager.coordinator().isPresent()) + if (!pendingRequests.hasUnsentRequests()) return EMPTY; List<NetworkClientDelegate.UnsentRequest> requests = pendingRequests.drainOnClose(); + System.out.print("ddraining + " + requests); Review Comment: remove ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java: ########## @@ -194,11 +189,11 @@ static void runAtClose(final Collection<Optional<? extends RequestManager>> requ // Poll to ensure that request has been written to the socket. Wait until either the timer has expired or until // all requests have received a response. - do { + while (timer.notExpired() && !requestFutures.stream().allMatch(Future::isDone)) { Review Comment: Why are you changing this back? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org