lucasbru commented on code in PR #15415:
URL: https://github.com/apache/kafka/pull/15415#discussion_r1499180116


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##########
@@ -215,6 +216,7 @@ public NetworkClientDelegate.PollResult poll(long 
currentTimeMs) {
         }
 
         NetworkClientDelegate.UnsentRequest request = 
makeHeartbeatRequest(currentTimeMs, false);
+        membershipManager.onHeartbeatRequestSent();

Review Comment:
   Would it not be more consistent to run this one a leave heartbeat as well, 
just handle the state correctly internally?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -744,16 +767,34 @@ public boolean isLeavingGroup() {
         return state == MemberState.PREPARE_LEAVING || state == 
MemberState.LEAVING;
     }
 
+    @Override
+    public void maybeRejoinStaleMember() {
+        if (state == MemberState.STALE) {
+            staleMemberAssignmentRelease.whenComplete((__, error) -> 
transitionToJoining());

Review Comment:
   Can we avoid enqueuing many of these completionstages while we haven't 
finished the `staleMemberAssignmentRelease`? although it looks like the 
operation is idempotent, it's not good to create an arbitrary number of Futures 
here (with every `PollApplicationEvent`, a new one is created!



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberState.java:
##########
@@ -132,11 +132,20 @@ public enum MemberState {
         PREPARE_LEAVING.previousValidStates = Arrays.asList(JOINING, STABLE, 
RECONCILING,
                 ACKNOWLEDGING, UNSUBSCRIBED, FENCED);
 
-        LEAVING.previousValidStates = Arrays.asList(PREPARE_LEAVING);
+        // Transition from prepare leaving to leaving is the expected one in 
all close operations
+        // except for when the poll timer expires (ex. leave group due to 
unsubscribe or consumer
+        // close, where member triggers callbacks first while it continues 
sending heartbeat
+        // (PREPARE_LEAVE state) and  then sends the heartbeat to leave 
(LEAVING state).
+        // All other transitions directly to LEAVING are expected when the 
member leaves due to
+        // expired poll timer. In that case, the member sends the heartbeat to 
leave first, and
+        // then invokes callbacks to release assignment while STALE, not 
sending any more
+        // heartbeats while STALE because it has been already removed from the 
group on the broker.
+        LEAVING.previousValidStates = Arrays.asList(PREPARE_LEAVING, JOINING, 
RECONCILING,

Review Comment:
   Couldn't we avoid this long comment and complexity by transitioning through 
`PREPARE_LEAVING` either way - even if we don't do much? I don't like how a 
corner case (poll timer expiration) complicates the state machine so much



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -564,7 +587,7 @@ public void transitionToJoining() {
      */
     @Override
     public CompletableFuture<Void> leaveGroup() {
-        if (state == MemberState.UNSUBSCRIBED || state == MemberState.FATAL) {
+        if (state == MemberState.UNSUBSCRIBED || state == MemberState.FATAL || 
state == MemberState.STALE) {

Review Comment:
   Do we really want to allow leave group when we are fenced already? As far as 
I can see we'd invoke `onPartitionsLost` when being fenced and then again 
`onPartitionsRevoked` when leave group. 
   
   Can we just use a `nonInGroup` check here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to