hachikuji commented on a change in pull request #9482:
URL: https://github.com/apache/kafka/pull/9482#discussion_r514584469



##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -1688,14 +1694,39 @@ private long pollCurrentState(long currentTimeMs) 
throws IOException {
         }
     }
 
+    private void pollListeners() {
+        // Register any listeners added since the last poll
+        while (!pendingListeners.isEmpty()) {
+            Listener<T> listener = pendingListeners.poll();
+            listenerContexts.add(new ListenerContext(listener));
+        }
+
+        // Check listener progress to see if reads are expected
+        quorum.highWatermark().ifPresent(highWatermarkMetadata -> {
+            long highWatermark = highWatermarkMetadata.offset;
+
+            List<ListenerContext> listenersToUpdate = listenerContexts.stream()
+                .filter(listenerContext -> {
+                    OptionalLong nextExpectedOffset = 
listenerContext.nextExpectedOffset();
+                    return nextExpectedOffset.isPresent() && 
nextExpectedOffset.getAsLong() < highWatermark;
+                })
+                .collect(Collectors.toList());
+
+            maybeFireHandleCommit(listenersToUpdate, 
highWatermarkMetadata.offset);
+        });
+    }
+
     public void poll() throws IOException {
         GracefulShutdown gracefulShutdown = shutdown.get();
         if (gracefulShutdown != null) {
             pollShutdown(gracefulShutdown);
         } else {
+            pollListeners();

Review comment:
       Hmm, that's a fair question. I think the listeners will tend to get new 
data in two cases: 1) high watermark advanced, or 2) a previous read completes. 
In the first case, the high watermark only advances in response to a request, 
so there should be no delay. In the second case, we call `wakeup()` to take us 
out of the network poll, so I think there also should be no delay. Can you 
think of a case where there would be a delay?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to