lianetm commented on code in PR #16686: URL: https://github.com/apache/kafka/pull/16686#discussion_r1833410324
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ########## @@ -1265,30 +1343,56 @@ private void close(Duration timeout, boolean swallowException) { } } - /** - * Prior to closing the network thread, we need to make sure the following operations happen in the right sequence: - * 1. autocommit offsets - * 2. release assignment. This is done via a background unsubscribe event that will - * trigger the callbacks, clear the assignment on the subscription state and send the leave group request to the broker - */ - private void releaseAssignmentAndLeaveGroup(final Timer timer) { + private void autoCommitOnClose(final Timer timer) { if (!groupMetadata.get().isPresent()) return; if (autoCommitEnabled) commitSyncAllConsumed(timer); applicationEventHandler.add(new CommitOnCloseEvent()); + } + + private void releaseAssignmentOnClose(final Timer timer) { + ConsumerGroupMetadata cgm = groupMetadata.get().orElse(null); + + if (cgm == null) + return; + + Set<TopicPartition> assignedPartitions = subscriptions.assignedPartitions(); + + if (assignedPartitions.isEmpty()) + // Nothing to revoke. + return; + + SortedSet<TopicPartition> droppedPartitions = new TreeSet<>(TOPIC_PARTITION_COMPARATOR); + droppedPartitions.addAll(assignedPartitions); + + boolean isThreadInterrupted = Thread.currentThread().isInterrupted(); Review Comment: This was the review I mentioned I started adding comments one day, and finished the day after he he, that's probably why. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org