kirktrue commented on code in PR #19609:
URL: https://github.com/apache/kafka/pull/19609#discussion_r2072110596


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java:
##########
@@ -154,16 +153,12 @@ void runOnce() {
         lastPollTimeMs = currentTimeMs;
 
         final long pollWaitTimeMs = requestManagers.entries().stream()
-                .filter(Optional::isPresent)
-                .map(Optional::get)
                 .map(rm -> rm.poll(currentTimeMs))
                 .map(networkClientDelegate::addAll)
                 .reduce(MAX_POLL_TIMEOUT_MS, Math::min);

Review Comment:
   I made a change here, but instead of:
   
   ```java
           final long pollWaitTimeMs = requestManagers.entries().stream()
                   .map(rm -> rm.poll(currentTimeMs))
                   .mapToLong(networkClientDelegate::addAll)
                   .min()
                   .orElse(MAX_POLL_TIMEOUT_MS);
   ```
   
   I had to add a call to `filter()`:
   
   ```java
           final long pollWaitTimeMs = requestManagers.entries().stream()
                   .map(rm -> rm.poll(currentTimeMs))
                   .mapToLong(networkClientDelegate::addAll)
                   .filter(ms -> ms <= MAX_POLL_TIMEOUT_MS)
                   .min()
                   .orElse(MAX_POLL_TIMEOUT_MS);
   ```
   
   The stream must only contain values that are numerically 
less-than-or-equal-to the maximum (`MAX_POLL_TIMEOUT_MS`) when `min()` is 
invoked or else we could end up with value that's greater than our maximum. 
Fortunately there is a unit test that caught that little wrinkle 😄



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to