cmccabe commented on code in PR #16626:
URL: https://github.com/apache/kafka/pull/16626#discussion_r1684605221


##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -348,22 +360,29 @@ private void onUpdateLeaderHighWatermark(
     private void updateListenersProgress(long highWatermark) {
         for (ListenerContext listenerContext : listenerContexts.values()) {
             listenerContext.nextExpectedOffset().ifPresent(nextExpectedOffset 
-> {
-                // Send snapshot to the listener, if the listener is at the 
beginning of the log and there is a snapshot,
-                // or the listener is trying to read an offset for which there 
isn't a segment in the log.
+                // Send snapshot to the listener, if there is a snapshot for 
the partition,
+                // and it is a new listener or
+                // the listener is trying to read an offset for which there 
isn't a segment in the log.
                 if (nextExpectedOffset < highWatermark &&
-                    ((nextExpectedOffset == 0 && latestSnapshot().isPresent()) 
||
-                     nextExpectedOffset < log.startOffset())
+                    ((nextExpectedOffset == 
ListenerContext.STARTING_NEXT_OFFSET || nextExpectedOffset < log.startOffset())
+                     && latestSnapshot().isPresent())
                 ) {
-                    SnapshotReader<T> snapshot = 
latestSnapshot().orElseThrow(() -> new IllegalStateException(
+                    listenerContext.fireHandleSnapshot(latestSnapshot().get());
+                } else if (nextExpectedOffset == 
ListenerContext.STARTING_NEXT_OFFSET) {
+                    // Reset the next offset to 0 since it is a new listener 
context and there is no
+                    // bootstraping checkpoint
+                    listenerContext.resetOffsetToLogStart();

Review Comment:
   does it make sense to have a log message here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to