lucasbru commented on code in PR #15372: URL: https://github.com/apache/kafka/pull/15372#discussion_r1500478736
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ########## @@ -232,13 +232,22 @@ public MembershipManager membershipManager() { * are sent, so blocking for longer than the heartbeat interval might mean the application thread is not * responsive to changes. * + * Similarly, we may have to unblock the application thread to send a `PollApplicationEvent` to make sure + * our poll timer will not expire while we are polling. + * * <p>In the event that heartbeats are currently being skipped, this still returns the next heartbeat * delay rather than {@code Long.MAX_VALUE} so that the application thread remains responsive. */ @Override public long maximumTimeToWait(long currentTimeMs) { - boolean heartbeatNow = membershipManager.shouldHeartbeatNow() && !heartbeatRequestState.requestInFlight(); - return heartbeatNow ? 0L : heartbeatRequestState.nextHeartbeatMs(currentTimeMs); + pollTimer.update(currentTimeMs); + if ( + pollTimer.isExpired() || + (membershipManager.shouldHeartbeatNow() && !heartbeatRequestState.requestInFlight()) + ) { + return 0L; + } + return Math.min(pollTimer.remainingMs() / 2, heartbeatRequestState.nextHeartbeatMs(currentTimeMs)); Review Comment: It's somewhat arbitrary. We need to make sure that the application thread doesn't block so long that the poll timer expires. We want to let it unblock sometime before the timer expires, and send an event to the background thread that it's still polling, and give the background thread time to process the event and reset the poll timer. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org