lucasbru commented on code in PR #16272:
URL: https://github.com/apache/kafka/pull/16272#discussion_r1638000934


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1278,12 +1278,10 @@ void prepareShutdown(final Timer timer, final 
AtomicReference<Throwable> firstEx
             autoCommitSync(timer);
 
         applicationEventHandler.add(new CommitOnCloseEvent());
-        completeQuietly(
-            () -> {
-                maybeRevokePartitions();
-                applicationEventHandler.addAndGet(new 
LeaveOnCloseEvent(calculateDeadlineMs(timer)));
-            },
-            "Failed to send leaveGroup heartbeat with a timeout(ms)=" + 
timer.timeoutMs(), firstException);
+        completeQuietly(() -> maybeRevokePartitions(),

Review Comment:
   So the point here seems to be, if `maybeRevokePartitions` fails, likely due 
to rebalancelistener, still leave the group.
   
   It looks like a good change to me. I'm just surprised to see that the legacy 
consumer does not seem to do this? If `onPrepareLeave` in `AbstractCoordinator` 
fails, we won't reach `maybeLeaveGroup`. 
   
   So is this a bug also in the legacy consumer?



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##########
@@ -158,7 +158,12 @@ public class AsyncKafkaConsumerTest {
     public void resetAll() {
         backgroundEventQueue.clear();
         if (consumer != null) {
-            consumer.close(Duration.ZERO);
+            try {
+                consumer.close(Duration.ZERO);
+            } catch (Exception e) {
+                // best effort to clean up after each test, but may throw (ex. 
if callbacks where
+                // throwing errors)

Review Comment:
   I did this before but was asked to not do it, see 
https://github.com/apache/kafka/pull/15613/files/9fb917e4b1e60f238183c92d1ad3bc2565a7e1ea#r1559907295.
 That's why I added a "clean-up close" in the tests where close fails, with an 
expected exception (search for "clean-up" in this file).
   
   I'd also be fine with your (and my original approach) to have a best-effort 
clean up and ignore exceptions here. But then, let's remove the "clean-up 
close" code in the other tests. Any consistent approach is fine with me here.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -673,7 +677,15 @@ public CompletableFuture<Void> leaveGroup() {
 
         CompletableFuture<Void> callbackResult = 
invokeOnPartitionsRevokedOrLostToReleaseAssignment();
         callbackResult.whenComplete((result, error) -> {
+            if (error != null) {
+                log.error("Member {} callback to release assignment failed. 
Member will proceed " +
+                    "to send leave group heartbeat", memberId, error);
+            } else {
+                log.debug("Member {} completed callback to release assignment 
and will send leave " +
+                    "group heartbeat", memberId);
+            }
             // Clear the subscription, no matter if the callback execution 
failed or succeeded.
+            subscriptions.unsubscribe();
             clearSubscription();

Review Comment:
   Is the name `clearSubscription` misleading? It seems like it clears the 
assignment, not the subscription. 



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1324,6 +1322,7 @@ void completeQuietly(final Utils.ThrowingRunnable 
function,
         } catch (TimeoutException e) {
             log.debug("Timeout expired before the {} operation could 
complete.", msg);

Review Comment:
   Should we update `firstException` here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to