lucasbru commented on code in PR #15415:
URL: https://github.com/apache/kafka/pull/15415#discussion_r1499180116
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##########
@@ -215,6 +216,7 @@ public NetworkClientDelegate.PollResult poll(long
currentTimeMs) {
}
NetworkClientDelegate.UnsentRequest request =
makeHeartbeatRequest(currentTimeMs, false);
+ membershipManager.onHeartbeatRequestSent();
Review Comment:
Would it not be more consistent to run this one a leave heartbeat as well,
just handle the state correctly internally?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -744,16 +767,34 @@ public boolean isLeavingGroup() {
return state == MemberState.PREPARE_LEAVING || state ==
MemberState.LEAVING;
}
+ @Override
+ public void maybeRejoinStaleMember() {
+ if (state == MemberState.STALE) {
+ staleMemberAssignmentRelease.whenComplete((__, error) ->
transitionToJoining());
Review Comment:
Can we avoid enqueuing many of these completionstages while we haven't
finished the `staleMemberAssignmentRelease`? although it looks like the
operation is idempotent, it's not good to create an arbitrary number of Futures
here (with every `PollApplicationEvent`, a new one is created!
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberState.java:
##########
@@ -132,11 +132,20 @@ public enum MemberState {
PREPARE_LEAVING.previousValidStates = Arrays.asList(JOINING, STABLE,
RECONCILING,
ACKNOWLEDGING, UNSUBSCRIBED, FENCED);
- LEAVING.previousValidStates = Arrays.asList(PREPARE_LEAVING);
+ // Transition from prepare leaving to leaving is the expected one in
all close operations
+ // except for when the poll timer expires (ex. leave group due to
unsubscribe or consumer
+ // close, where member triggers callbacks first while it continues
sending heartbeat
+ // (PREPARE_LEAVE state) and then sends the heartbeat to leave
(LEAVING state).
+ // All other transitions directly to LEAVING are expected when the
member leaves due to
+ // expired poll timer. In that case, the member sends the heartbeat to
leave first, and
+ // then invokes callbacks to release assignment while STALE, not
sending any more
+ // heartbeats while STALE because it has been already removed from the
group on the broker.
+ LEAVING.previousValidStates = Arrays.asList(PREPARE_LEAVING, JOINING,
RECONCILING,
Review Comment:
Couldn't we avoid this long comment and complexity by transitioning through
`PREPARE_LEAVING` either way - even if we don't do much? I don't like how a
corner case (poll timer expiration) complicates the state machine so much
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -564,7 +587,7 @@ public void transitionToJoining() {
*/
@Override
public CompletableFuture<Void> leaveGroup() {
- if (state == MemberState.UNSUBSCRIBED || state == MemberState.FATAL) {
+ if (state == MemberState.UNSUBSCRIBED || state == MemberState.FATAL ||
state == MemberState.STALE) {
Review Comment:
Do we really want to allow leave group when we are fenced already? As far as
I can see we'd invoke `onPartitionsLost` when being fenced and then again
`onPartitionsRevoked` when leave group.
Can we just use a `nonInGroup` check here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]