lucasbru commented on code in PR #14920:
URL: https://github.com/apache/kafka/pull/14920#discussion_r1417484146


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -957,6 +966,57 @@ private void close(Duration timeout, boolean 
swallowException) {
         }
     }
 
+    /**
+     * Prior to closing the network thread, we need to make sure the following 
operations happen in the right sequence:
+     * 1. autocommit offsets
+     * 2. revoke all partitions
+     */
+    private void prepareShutdown(final Timer timer) {
+        if (!groupMetadata.isPresent())
+            return;
+
+        maybeAutoCommitSync(timer);
+        timer.update();
+        if (!subscriptions.hasAutoAssignedPartitions() || 
subscriptions.assignedPartitions().isEmpty())
+            return;
+
+        try {
+            // If the consumer is in a group, we will pause and revoke all 
assigned partitions
+            onLeavePrepare().get(timer.remainingMs(), TimeUnit.MILLISECONDS);
+            timer.update();
+        } catch (Exception e) {
+            Exception exception = e;
+            if (e instanceof ExecutionException)
+                exception = (Exception) e.getCause();
+            throw new KafkaException("User rebalance callback throws an 
error", exception);
+        } finally {
+            subscriptions.assignFromSubscribed(Collections.emptySet());
+        }
+    }
+
+    private void maybeAutoCommitSync(final Timer timer) {
+        if (autoCommitEnabled) {
+            Map<TopicPartition, OffsetAndMetadata> allConsumed = 
subscriptions.allConsumed();
+            try {
+                log.debug("Sending synchronous auto-commit of offsets {} on 
closing", allConsumed);
+                commitSync(allConsumed, 
Duration.ofMillis(timer.remainingMs()));
+            } catch (Exception e) {
+                // consistent with async auto-commit failures, we do not 
propagate the exception
+                log.warn("Synchronous auto-commit of offsets {} failed: {}", 
allConsumed, e.getMessage());
+            }
+        }
+    }
+
+    private CompletableFuture<Void> onLeavePrepare() {
+        SortedSet<TopicPartition> droppedPartitions = new 
TreeSet<>(MembershipManagerImpl.TOPIC_PARTITION_COMPARATOR);
+        droppedPartitions.addAll(subscriptions.assignedPartitions());
+        if (!subscriptions.hasAutoAssignedPartitions() || 
droppedPartitions.isEmpty()) {
+            return CompletableFuture.completedFuture(null);
+        }
+        // TODO: Invoke rebalanceListener via KAFKA-15276
+        return CompletableFuture.completedFuture(null);
+    }
+

Review Comment:
   Several people seem to agree that we should solve this as much as possible 
via an event. Is the new draft PR going to replace this PR, or should we try to 
merge this one?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to