kirktrue commented on code in PR #15375: URL: https://github.com/apache/kafka/pull/15375#discussion_r1490029659
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ########## @@ -188,18 +188,18 @@ public HeartbeatRequestManager( @Override public NetworkClientDelegate.PollResult poll(long currentTimeMs) { if (!coordinatorRequestManager.coordinator().isPresent() || - membershipManager.shouldSkipHeartbeat() || - pollTimer.isExpired()) { + membershipManager.shouldSkipHeartbeat()) { membershipManager.onHeartbeatRequestSkipped(); return NetworkClientDelegate.PollResult.EMPTY; } pollTimer.update(currentTimeMs); - if (pollTimer.isExpired()) { - logger.warn("consumer poll timeout has expired. This means the time between subsequent calls to poll() " + - "was longer than the configured max.poll.interval.ms, which typically implies that " + - "the poll loop is spending too much time processing messages. You can address this " + - "either by increasing max.poll.interval.ms or by reducing the maximum size of batches " + - "returned in poll() with max.poll.records."); + if (pollTimer.isExpired() && !membershipManager.isLeavingGroup()) { + logger.warn("Consumer poll timeout has expired. This means the time between " + + "subsequent calls to poll() was longer than the configured max.poll.interval.ms, " + + "which typically implies that the poll loop is spending too much time processing " + + "messages. You can address this either by increasing max.poll.interval.ms or by " + + "reducing the maximum size of batches returned in poll() with max.poll.records."); + Review Comment: Should the `!membershipManager.isLeavingGroup()` if clause be a nested `if` _inside_ the `if (pollTimer.isExpired()`? That is, do we really want to continue down to line 212 if the timer is expired but it's not already leaving the group? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ########## @@ -335,8 +335,13 @@ public int memberEpoch() { return memberEpoch; } + /** + * @return True if there hasn't been a call to consumer.poll() withing the max.poll.interval. + * In that case, it is expected that the member will leave the group and rejoin on the next + * call to consumer.poll(). + */ @Override - public boolean isStaled() { + public boolean isStale() { Review Comment: Thank you!!!!! 😄 ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ########## @@ -335,8 +335,13 @@ public int memberEpoch() { return memberEpoch; } + /** + * @return True if there hasn't been a call to consumer.poll() withing the max.poll.interval. + * In that case, it is expected that the member will leave the group and rejoin on the next + * call to consumer.poll(). + */ @Override - public boolean isStaled() { + public boolean isStale() { Review Comment: Any reason we don't just expose the inner state via a `state()` method so that we don't have to write `isStateA`, `isStateB`, `isStateC`, etc.? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ########## @@ -685,13 +690,6 @@ public boolean shouldHeartbeatNow() { @Override public void onHeartbeatRequestSent() { MemberState state = state(); - if (isStaled()) { - log.debug("Member {} is staled and is therefore leaving the group. It will rejoin upon the next poll.", memberEpoch); - // TODO: Integrate partition revocation/loss callback - transitionToJoining(); - return; - } - Review Comment: For the uninitiated (like myself), would you consider adding a really brief comment that provides pointers to where some of the other states (e.g. `STALE`) are handled? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java: ########## @@ -64,9 +64,11 @@ public interface MembershipManager extends RequestManager { MemberState state(); /** - * @return True if the member is staled due to expired poll timer. + * @return True if the poll timer expired, indicating that there hasn't been a call to + * consumer poll within the max poll interval. In this case, the member will proactively + * leave the group, and rejoin on the next call to poll. */ - boolean isStaled(); + boolean isStale(); Review Comment: Thank you!!!!! 😄 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org