AndrewJSchofield commented on code in PR #14835:
URL: https://github.com/apache/kafka/pull/14835#discussion_r1411721121


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##########
@@ -205,6 +205,26 @@ public void wakeup() {
             networkClientDelegate.wakeup();
     }
 
+    /**
+     * Returns the delay for which the application thread can safely wait 
before it should be responsive
+     * to results from the request managers. For example, the subscription 
state can change when heartbeats
+     * are sent, so blocking for longer than the heartbeat interval might mean 
the application thread is not
+     * responsive to changes.
+     *
+     * @return The maximum delay in milliseconds
+     */
+    public long maximumTimeToWait() {
+        final long currentTimeMs = time.milliseconds();
+        if (requestManagers == null) {
+            return MAX_POLL_TIMEOUT_MS;
+        }
+        return requestManagers.entries().stream()
+                .filter(Optional::isPresent)
+                .map(Optional::get)
+                .map(rm -> rm.maximumTimeToWait(currentTimeMs))
+                .reduce(Long.MAX_VALUE, Math::min);
+    }
+

Review Comment:
   Something like this would work I think. The aim after all is to keep things 
responsive rather than an accurate regular tick.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to