kirktrue commented on code in PR #14835:
URL: https://github.com/apache/kafka/pull/14835#discussion_r1411470319


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##########
@@ -205,6 +205,26 @@ public void wakeup() {
             networkClientDelegate.wakeup();
     }
 
+    /**
+     * Returns the delay for which the application thread can safely wait 
before it should be responsive
+     * to results from the request managers. For example, the subscription 
state can change when heartbeats
+     * are sent, so blocking for longer than the heartbeat interval might mean 
the application thread is not
+     * responsive to changes.
+     *
+     * @return The maximum delay in milliseconds
+     */
+    public long maximumTimeToWait() {
+        final long currentTimeMs = time.milliseconds();
+        if (requestManagers == null) {
+            return MAX_POLL_TIMEOUT_MS;
+        }
+        return requestManagers.entries().stream()
+                .filter(Optional::isPresent)
+                .map(Optional::get)
+                .map(rm -> rm.maximumTimeToWait(currentTimeMs))
+                .reduce(Long.MAX_VALUE, Math::min);
+    }
+

Review Comment:
   I'm wondering how "fresh" the "maximum time to wait" value needs to be...
   
   Spitballing here, but... say we introduce a new instance variable in 
`ConsumerNetworkThread` that stores a cached value named, say, 
`notTotallyFreshMaxWaitCached`? We could then invoke `maximumTimeToWait()` from 
inside `runOnce()` and update `notTotallyFreshMaxWaitCached`, like so:
   
   ```java
       private volatile long notTotallyFreshMaxWaitCached = MAX_POLL_TIMEOUT_MS;
   
       void runOnce() {
           // If there are errors processing any events, the error will be 
thrown immediately. This will have
           // the effect of closing the background thread.
           applicationEventProcessor.process();
   
           final long currentTimeMs = time.milliseconds();
           final long pollWaitTimeMs = requestManagers.entries().stream()
                   .filter(Optional::isPresent)
                   .map(Optional::get)
                   .map(rm -> rm.poll(currentTimeMs))
                   .map(networkClientDelegate::addAll)
                   .reduce(MAX_POLL_TIMEOUT_MS, Math::min);
           networkClientDelegate.poll(pollWaitTimeMs, currentTimeMs);
   
           notTotallyFreshMaxWaitCached = requestManagers.entries().stream()
                   .filter(Optional::isPresent)
                   .map(Optional::get)
                   .map(rm -> rm.maximumTimeToWait(currentTimeMs))
                   .reduce(Long.MAX_VALUE, Math::min);
       }
   
       public long maximumTimeToWait() {
           if (requestManagers == null) {
               return MAX_POLL_TIMEOUT_MS;
           }
           return notTotallyFreshMaxWaitCached;
       }
   ```
   
   (Or something like that.)
   
   This way we still only touch the `RequestManager`s inside `runOnce()`, so we 
don't open overselves up to subtle concurrency issues.
   
   Thoughts?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to