AndrewJSchofield commented on code in PR #15375:
URL: https://github.com/apache/kafka/pull/15375#discussion_r1490086839
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -731,10 +729,29 @@ private boolean targetAssignmentReconciled() {
return currentAssignment.equals(currentTargetAssignment);
}
+ /**
+ * @return True if the member should not send heartbeats, which would be
one of the following
+ * cases:
+ * <ul>
+ * <li>Member is not subscribed to any topics</li>
+ * <li>Member has received a fatal error in a previous heartbeat
response</li>
+ * <li>Member is stale, meaning that it has left the group due to expired
poll timer</li>
+ * </ul>
+ */
@Override
public boolean shouldSkipHeartbeat() {
MemberState state = state();
- return state == MemberState.UNSUBSCRIBED || state == MemberState.FATAL;
+ return state == MemberState.UNSUBSCRIBED || state == MemberState.FATAL
|| state == MemberState.STALE;
+ }
+
+ /**
+ * @return True if the member is preparing to leave the group (waiting for
callbacks), or
+ * leaving (sending last heartbeat). This is used to skip proactively
leaving the group when
+ * the consumer poll timer expires.
+ */
+ public boolean isLeavingGroup() {
+ MemberState state = state();
+ return state == MemberState.PREPARE_LEAVING || state ==
MemberState.LEAVING;
}
/**
Review Comment:
I think that technically, it doesn't take the *user* to poll in order to
rejoin. The code polls more frequently is the user's poll timeout is long
enough.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##########
@@ -188,18 +188,18 @@ public HeartbeatRequestManager(
@Override
public NetworkClientDelegate.PollResult poll(long currentTimeMs) {
if (!coordinatorRequestManager.coordinator().isPresent() ||
- membershipManager.shouldSkipHeartbeat() ||
- pollTimer.isExpired()) {
+ membershipManager.shouldSkipHeartbeat()) {
membershipManager.onHeartbeatRequestSkipped();
return NetworkClientDelegate.PollResult.EMPTY;
}
pollTimer.update(currentTimeMs);
- if (pollTimer.isExpired()) {
- logger.warn("consumer poll timeout has expired. This means the
time between subsequent calls to poll() " +
- "was longer than the configured max.poll.interval.ms, which
typically implies that " +
- "the poll loop is spending too much time processing messages.
You can address this " +
- "either by increasing max.poll.interval.ms or by reducing the
maximum size of batches " +
- "returned in poll() with max.poll.records.");
+ if (pollTimer.isExpired() && !membershipManager.isLeavingGroup()) {
+ logger.warn("Consumer poll timeout has expired. This means the
time between " +
+ "subsequent calls to poll() was longer than the configured
max.poll.interval.ms, " +
+ "which typically implies that the poll loop is spending too
much time processing " +
+ "messages. You can address this either by increasing
max.poll.interval.ms or by " +
+ "reducing the maximum size of batches returned in poll() with
max.poll.records.");
+
Review Comment:
I think the logic as written is correct.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]