lianetm commented on code in PR #17165: URL: https://github.com/apache/kafka/pull/17165#discussion_r1772011078
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java: ########## @@ -246,7 +252,9 @@ private void process(final SubscriptionChangeEvent ignored) { */ private void process(final UnsubscribeEvent event) { if (requestManagers.consumerHeartbeatRequestManager.isPresent()) { + System.out.println("UnsubscribeEvent: " + event); Review Comment: lets remove this please ########## core/src/test/scala/integration/kafka/api/PlaintextConsumerSubscriptionTest.scala: ########## @@ -244,6 +244,7 @@ class PlaintextConsumerSubscriptionTest extends AbstractConsumerTest { }, waitTimeMs = 5000, msg = "An InvalidTopicException should be thrown.") assertEquals(s"Invalid topics: [${invalidTopicName}]", exception.getMessage) + Thread.sleep(1000) Review Comment: remove? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java: ########## @@ -466,6 +474,17 @@ String memberIdInfoForLog() { */ public void onSubscriptionUpdated() { if (state == MemberState.UNSUBSCRIBED) { Review Comment: I wonder if this check is still needed here? It made sense before because we were doing the actual join, but now we're just setting a flag (that looks more like a "subscriptionUpdated" var now). Then on the `maybeJoinGroup` is where we need to check that the state if `UNSUBSCRIBED` and `subscriptionUpdated` then join. Makes sense? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java: ########## @@ -246,7 +252,9 @@ private void process(final SubscriptionChangeEvent ignored) { */ private void process(final UnsubscribeEvent event) { if (requestManagers.consumerHeartbeatRequestManager.isPresent()) { + System.out.println("UnsubscribeEvent: " + event); CompletableFuture<Void> future = requestManagers.consumerHeartbeatRequestManager.get().membershipManager().leaveGroup(); + System.out.println("UnsubscribeEvent: " + future.isCompletedExceptionally()); Review Comment: ditto ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java: ########## @@ -524,7 +543,7 @@ public void transitionToJoining() { * to leave the group has been sent out. */ public CompletableFuture<Void> leaveGroup() { - if (isNotInGroup()) { + if (isNotInGroup() || state == MemberState.JOINING) { Review Comment: I don't quite get how this change relates to this PR, why is it that we need this here? Also, this is effectively ignoring the leave group (unsubscribe) if JOINING, which I would expect is not right (the member will remain JOINING and may even become STABLE, never leave). Before this change, JOINING + consumer.unsubscribe => LEAVING (member would run the full leave flow and attempt to send the leave HB). With this change, JOINING + consumer.unsubscribe => still JOINING. Is that the intention? I know that the leave while joining has challenges to solve so that it can be processed correctly (KIP-1082), but I would expect that we keep the intention we had here, even after the KIP-1082 fixes: we should attempt to leave the group when there's a call to unsubscribe while waiting for the join response (JOINING) because the broker may have already processed the join. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org