kirktrue commented on code in PR #19609: URL: https://github.com/apache/kafka/pull/19609#discussion_r2072110596
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java: ########## @@ -154,16 +153,12 @@ void runOnce() { lastPollTimeMs = currentTimeMs; final long pollWaitTimeMs = requestManagers.entries().stream() - .filter(Optional::isPresent) - .map(Optional::get) .map(rm -> rm.poll(currentTimeMs)) .map(networkClientDelegate::addAll) .reduce(MAX_POLL_TIMEOUT_MS, Math::min); Review Comment: I made a change here, but instead of: ```java final long pollWaitTimeMs = requestManagers.entries().stream() .map(rm -> rm.poll(currentTimeMs)) .mapToLong(networkClientDelegate::addAll) .min() .orElse(MAX_POLL_TIMEOUT_MS); ``` I had to add a call to `filter()`: ```java final long pollWaitTimeMs = requestManagers.entries().stream() .map(rm -> rm.poll(currentTimeMs)) .mapToLong(networkClientDelegate::addAll) .filter(ms -> ms <= MAX_POLL_TIMEOUT_MS) .min() .orElse(MAX_POLL_TIMEOUT_MS); ``` The stream must only contain values that are numerically less-than-or-equal-to the maximum (`MAX_POLL_TIMEOUT_MS`) when `min()` is invoked or else we could end up with value that's greater than our maximum. Fortunately there is a unit test that caught that little wrinkle 😄 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org