AndrewJSchofield commented on code in PR #14835:
URL: https://github.com/apache/kafka/pull/14835#discussion_r1411833816
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##########
@@ -205,6 +205,26 @@ public void wakeup() {
networkClientDelegate.wakeup();
}
+ /**
+ * Returns the delay for which the application thread can safely wait
before it should be responsive
+ * to results from the request managers. For example, the subscription
state can change when heartbeats
+ * are sent, so blocking for longer than the heartbeat interval might mean
the application thread is not
+ * responsive to changes.
+ *
+ * @return The maximum delay in milliseconds
+ */
+ public long maximumTimeToWait() {
+ final long currentTimeMs = time.milliseconds();
+ if (requestManagers == null) {
+ return MAX_POLL_TIMEOUT_MS;
+ }
+ return requestManagers.entries().stream()
+ .filter(Optional::isPresent)
+ .map(Optional::get)
+ .map(rm -> rm.maximumTimeToWait(currentTimeMs))
+ .reduce(Long.MAX_VALUE, Math::min);
+ }
+
Review Comment:
I'll take this concept and work on it in a follow-on PR.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]