hachikuji commented on a change in pull request #9482: URL: https://github.com/apache/kafka/pull/9482#discussion_r514584469
########## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ########## @@ -1688,14 +1694,39 @@ private long pollCurrentState(long currentTimeMs) throws IOException { } } + private void pollListeners() { + // Register any listeners added since the last poll + while (!pendingListeners.isEmpty()) { + Listener<T> listener = pendingListeners.poll(); + listenerContexts.add(new ListenerContext(listener)); + } + + // Check listener progress to see if reads are expected + quorum.highWatermark().ifPresent(highWatermarkMetadata -> { + long highWatermark = highWatermarkMetadata.offset; + + List<ListenerContext> listenersToUpdate = listenerContexts.stream() + .filter(listenerContext -> { + OptionalLong nextExpectedOffset = listenerContext.nextExpectedOffset(); + return nextExpectedOffset.isPresent() && nextExpectedOffset.getAsLong() < highWatermark; + }) + .collect(Collectors.toList()); + + maybeFireHandleCommit(listenersToUpdate, highWatermarkMetadata.offset); + }); + } + public void poll() throws IOException { GracefulShutdown gracefulShutdown = shutdown.get(); if (gracefulShutdown != null) { pollShutdown(gracefulShutdown); } else { + pollListeners(); Review comment: Hmm, that's a fair question. I think the listeners will tend to get new data in two cases: 1) high watermark advanced, or 2) a previous read completes. In the first case, the high watermark only advances in response to a request, so there should be no delay. In the second case, we call `wakeup()` to take us out of the network poll, so I think there also should be no delay. Can you think of a case where there would be a delay? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org