lianetm commented on code in PR #17165:
URL: https://github.com/apache/kafka/pull/17165#discussion_r1772011078


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##########
@@ -246,7 +252,9 @@ private void process(final SubscriptionChangeEvent ignored) 
{
      */
     private void process(final UnsubscribeEvent event) {
         if (requestManagers.consumerHeartbeatRequestManager.isPresent()) {
+            System.out.println("UnsubscribeEvent: " + event);

Review Comment:
   lets remove this please



##########
core/src/test/scala/integration/kafka/api/PlaintextConsumerSubscriptionTest.scala:
##########
@@ -244,6 +244,7 @@ class PlaintextConsumerSubscriptionTest extends 
AbstractConsumerTest {
     }, waitTimeMs = 5000, msg = "An InvalidTopicException should be thrown.")
 
     assertEquals(s"Invalid topics: [${invalidTopicName}]", 
exception.getMessage)
+    Thread.sleep(1000)

Review Comment:
   remove?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java:
##########
@@ -466,6 +474,17 @@ String memberIdInfoForLog() {
      */
     public void onSubscriptionUpdated() {
         if (state == MemberState.UNSUBSCRIBED) {

Review Comment:
   I wonder if this check is still needed here? It made sense before because we 
were doing the actual join, but now we're just setting a flag (that looks more 
like a "subscriptionUpdated" var now). Then on the `maybeJoinGroup` is where we 
need to check that the state if `UNSUBSCRIBED` and `subscriptionUpdated` then 
join. Makes sense?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##########
@@ -246,7 +252,9 @@ private void process(final SubscriptionChangeEvent ignored) 
{
      */
     private void process(final UnsubscribeEvent event) {
         if (requestManagers.consumerHeartbeatRequestManager.isPresent()) {
+            System.out.println("UnsubscribeEvent: " + event);
             CompletableFuture<Void> future = 
requestManagers.consumerHeartbeatRequestManager.get().membershipManager().leaveGroup();
+            System.out.println("UnsubscribeEvent: " + 
future.isCompletedExceptionally());

Review Comment:
   ditto



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java:
##########
@@ -524,7 +543,7 @@ public void transitionToJoining() {
      * to leave the group has been sent out.
      */
     public CompletableFuture<Void> leaveGroup() {
-        if (isNotInGroup()) {
+        if (isNotInGroup() || state == MemberState.JOINING) {

Review Comment:
   I don't quite get how this change relates to this PR, why is it that we need 
this here?
   
   Also, this is effectively ignoring the leave group (unsubscribe) if JOINING, 
which I would expect is not right (the member will remain JOINING and may even 
become STABLE, never leave). Before this change, JOINING + consumer.unsubscribe 
=> LEAVING (member would run the full leave flow and attempt to send the leave 
HB). With this change, JOINING + consumer.unsubscribe => still JOINING. Is that 
the intention?
   
   I know that the leave while joining has challenges to solve so that it can 
be processed correctly (KIP-1082), but I would expect that we keep the 
intention we had here, even after the KIP-1082 fixes: we should attempt to 
leave the group when there's a call to unsubscribe while waiting for the join 
response (JOINING) because the broker may have already processed the join.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to