AndrewJSchofield commented on code in PR #14835: URL: https://github.com/apache/kafka/pull/14835#discussion_r1411721121
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java: ########## @@ -205,6 +205,26 @@ public void wakeup() { networkClientDelegate.wakeup(); } + /** + * Returns the delay for which the application thread can safely wait before it should be responsive + * to results from the request managers. For example, the subscription state can change when heartbeats + * are sent, so blocking for longer than the heartbeat interval might mean the application thread is not + * responsive to changes. + * + * @return The maximum delay in milliseconds + */ + public long maximumTimeToWait() { + final long currentTimeMs = time.milliseconds(); + if (requestManagers == null) { + return MAX_POLL_TIMEOUT_MS; + } + return requestManagers.entries().stream() + .filter(Optional::isPresent) + .map(Optional::get) + .map(rm -> rm.maximumTimeToWait(currentTimeMs)) + .reduce(Long.MAX_VALUE, Math::min); + } + Review Comment: Something like this would work I think. The aim after all is to keep things responsive rather than an accurate regular tick. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org