kirktrue commented on code in PR #15375:
URL: https://github.com/apache/kafka/pull/15375#discussion_r1490029659


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##########
@@ -188,18 +188,18 @@ public HeartbeatRequestManager(
     @Override
     public NetworkClientDelegate.PollResult poll(long currentTimeMs) {
         if (!coordinatorRequestManager.coordinator().isPresent() ||
-            membershipManager.shouldSkipHeartbeat() ||
-            pollTimer.isExpired()) {
+            membershipManager.shouldSkipHeartbeat()) {
             membershipManager.onHeartbeatRequestSkipped();
             return NetworkClientDelegate.PollResult.EMPTY;
         }
         pollTimer.update(currentTimeMs);
-        if (pollTimer.isExpired()) {
-            logger.warn("consumer poll timeout has expired. This means the 
time between subsequent calls to poll() " +
-                "was longer than the configured max.poll.interval.ms, which 
typically implies that " +
-                "the poll loop is spending too much time processing messages. 
You can address this " +
-                "either by increasing max.poll.interval.ms or by reducing the 
maximum size of batches " +
-                "returned in poll() with max.poll.records.");
+        if (pollTimer.isExpired() && !membershipManager.isLeavingGroup()) {
+            logger.warn("Consumer poll timeout has expired. This means the 
time between " +
+                "subsequent calls to poll() was longer than the configured 
max.poll.interval.ms, " +
+                "which typically implies that the poll loop is spending too 
much time processing " +
+                "messages. You can address this either by increasing 
max.poll.interval.ms or by " +
+                "reducing the maximum size of batches returned in poll() with 
max.poll.records.");
+

Review Comment:
   Should the `!membershipManager.isLeavingGroup()` if clause be a nested `if` 
_inside_ the `if (pollTimer.isExpired()`? That is, do we really want to 
continue down to line 212 if the timer is expired but it's not already leaving 
the group?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -335,8 +335,13 @@ public int memberEpoch() {
         return memberEpoch;
     }
 
+    /**
+     * @return True if there hasn't been a call to consumer.poll() withing the 
max.poll.interval.
+     * In that case, it is expected that the member will leave the group and 
rejoin on the next
+     * call to consumer.poll().
+     */
     @Override
-    public boolean isStaled() {
+    public boolean isStale() {

Review Comment:
   Thank you!!!!! 😄 



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -335,8 +335,13 @@ public int memberEpoch() {
         return memberEpoch;
     }
 
+    /**
+     * @return True if there hasn't been a call to consumer.poll() withing the 
max.poll.interval.
+     * In that case, it is expected that the member will leave the group and 
rejoin on the next
+     * call to consumer.poll().
+     */
     @Override
-    public boolean isStaled() {
+    public boolean isStale() {

Review Comment:
   Any reason we don't just expose the inner state via a `state()` method so 
that we don't have to write `isStateA`, `isStateB`, `isStateC`,  etc.?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -685,13 +690,6 @@ public boolean shouldHeartbeatNow() {
     @Override
     public void onHeartbeatRequestSent() {
         MemberState state = state();
-        if (isStaled()) {
-            log.debug("Member {} is staled and is therefore leaving the group. 
 It will rejoin upon the next poll.", memberEpoch);
-            // TODO: Integrate partition revocation/loss callback
-            transitionToJoining();
-            return;
-        }
-

Review Comment:
   For the uninitiated (like myself), would you consider adding a really brief 
comment that provides pointers to where some of the other states (e.g. `STALE`) 
are handled?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java:
##########
@@ -64,9 +64,11 @@ public interface MembershipManager extends RequestManager {
     MemberState state();
 
     /**
-     * @return True if the member is staled due to expired poll timer.
+     * @return True if the poll timer expired, indicating that there hasn't 
been a call to
+     * consumer poll within the max poll interval. In this case, the member 
will proactively
+     * leave the group, and rejoin on the next call to poll.
      */
-    boolean isStaled();
+    boolean isStale();

Review Comment:
   Thank you!!!!! 😄



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to