kirktrue commented on code in PR #15375:
URL: https://github.com/apache/kafka/pull/15375#discussion_r1490029659
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##########
@@ -188,18 +188,18 @@ public HeartbeatRequestManager(
@Override
public NetworkClientDelegate.PollResult poll(long currentTimeMs) {
if (!coordinatorRequestManager.coordinator().isPresent() ||
- membershipManager.shouldSkipHeartbeat() ||
- pollTimer.isExpired()) {
+ membershipManager.shouldSkipHeartbeat()) {
membershipManager.onHeartbeatRequestSkipped();
return NetworkClientDelegate.PollResult.EMPTY;
}
pollTimer.update(currentTimeMs);
- if (pollTimer.isExpired()) {
- logger.warn("consumer poll timeout has expired. This means the
time between subsequent calls to poll() " +
- "was longer than the configured max.poll.interval.ms, which
typically implies that " +
- "the poll loop is spending too much time processing messages.
You can address this " +
- "either by increasing max.poll.interval.ms or by reducing the
maximum size of batches " +
- "returned in poll() with max.poll.records.");
+ if (pollTimer.isExpired() && !membershipManager.isLeavingGroup()) {
+ logger.warn("Consumer poll timeout has expired. This means the
time between " +
+ "subsequent calls to poll() was longer than the configured
max.poll.interval.ms, " +
+ "which typically implies that the poll loop is spending too
much time processing " +
+ "messages. You can address this either by increasing
max.poll.interval.ms or by " +
+ "reducing the maximum size of batches returned in poll() with
max.poll.records.");
+
Review Comment:
Should the `!membershipManager.isLeavingGroup()` if clause be a nested `if`
_inside_ the `if (pollTimer.isExpired()`? That is, do we really want to
continue down to line 212 if the timer is expired but it's not already leaving
the group?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -335,8 +335,13 @@ public int memberEpoch() {
return memberEpoch;
}
+ /**
+ * @return True if there hasn't been a call to consumer.poll() withing the
max.poll.interval.
+ * In that case, it is expected that the member will leave the group and
rejoin on the next
+ * call to consumer.poll().
+ */
@Override
- public boolean isStaled() {
+ public boolean isStale() {
Review Comment:
Thank you!!!!! 😄
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -335,8 +335,13 @@ public int memberEpoch() {
return memberEpoch;
}
+ /**
+ * @return True if there hasn't been a call to consumer.poll() withing the
max.poll.interval.
+ * In that case, it is expected that the member will leave the group and
rejoin on the next
+ * call to consumer.poll().
+ */
@Override
- public boolean isStaled() {
+ public boolean isStale() {
Review Comment:
Any reason we don't just expose the inner state via a `state()` method so
that we don't have to write `isStateA`, `isStateB`, `isStateC`, etc.?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -685,13 +690,6 @@ public boolean shouldHeartbeatNow() {
@Override
public void onHeartbeatRequestSent() {
MemberState state = state();
- if (isStaled()) {
- log.debug("Member {} is staled and is therefore leaving the group.
It will rejoin upon the next poll.", memberEpoch);
- // TODO: Integrate partition revocation/loss callback
- transitionToJoining();
- return;
- }
-
Review Comment:
For the uninitiated (like myself), would you consider adding a really brief
comment that provides pointers to where some of the other states (e.g. `STALE`)
are handled?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java:
##########
@@ -64,9 +64,11 @@ public interface MembershipManager extends RequestManager {
MemberState state();
/**
- * @return True if the member is staled due to expired poll timer.
+ * @return True if the poll timer expired, indicating that there hasn't
been a call to
+ * consumer poll within the max poll interval. In this case, the member
will proactively
+ * leave the group, and rejoin on the next call to poll.
*/
- boolean isStaled();
+ boolean isStale();
Review Comment:
Thank you!!!!! 😄
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]