hachikuji commented on a change in pull request #9482:
URL: https://github.com/apache/kafka/pull/9482#discussion_r510455713



##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -206,30 +234,77 @@ private void updateLeaderEndOffsetAndTimestamp(
         final LogOffsetMetadata endOffsetMetadata = log.endOffset();
 
         if (state.updateLocalState(currentTimeMs, endOffsetMetadata)) {
-            updateHighWatermark(state, currentTimeMs);
+            onUpdateLeaderHighWatermark(state, currentTimeMs);
         }
 
-        LogOffset endOffset = new LogOffset(endOffsetMetadata.offset, 
Isolation.UNCOMMITTED);
-        fetchPurgatory.maybeComplete(endOffset, currentTimeMs);
+        fetchPurgatory.maybeComplete(endOffsetMetadata.offset, currentTimeMs);
     }
 
-    private void updateHighWatermark(
-        EpochState state,
+    private void onUpdateLeaderHighWatermark(
+        LeaderState state,
         long currentTimeMs
     ) {
         state.highWatermark().ifPresent(highWatermark -> {
-            logger.debug("High watermark updated to {}", highWatermark);
+            logger.debug("Leader high watermark updated to {}", highWatermark);
             log.updateHighWatermark(highWatermark);
-
-            LogOffset offset = new LogOffset(highWatermark.offset, 
Isolation.COMMITTED);
-            appendPurgatory.maybeComplete(offset, currentTimeMs);
-            fetchPurgatory.maybeComplete(offset, currentTimeMs);
+            appendPurgatory.maybeComplete(highWatermark.offset, currentTimeMs);
+            maybeFireHandleCommit(highWatermark.offset);
         });
     }
 
-    @Override
-    public LeaderAndEpoch currentLeaderAndEpoch() {
-        return quorum.leaderAndEpoch();
+    private void maybeFireHandleCommit(long highWatermark) {
+        maybeFireHandleCommit(listenerContexts, highWatermark);
+    }
+
+    private void maybeFireHandleCommit(List<ListenerContext> listenerContexts, 
long highWatermark) {
+        // TODO: When there are multiple listeners, we can cache reads to save 
some work
+        for (ListenerContext listenerContext : listenerContexts) {
+            OptionalLong nextExpectedOffsetOpt = 
listenerContext.nextExpectedOffset();
+            if (!nextExpectedOffsetOpt.isPresent()) {
+                return;
+            }
+
+            long nextExpectedOffset = nextExpectedOffsetOpt.getAsLong();
+            if (nextExpectedOffset < highWatermark) {
+                LogFetchInfo readInfo = log.read(nextExpectedOffset, 
Isolation.COMMITTED);
+                listenerContext.fireHandleCommit(nextExpectedOffset, 
readInfo.records);
+            }
+        }
+    }
+
+    private void maybeFireHandleCommit(long baseOffset, int epoch, List<T> 
records) {
+        for (ListenerContext listenerContext : listenerContexts) {
+            OptionalLong nextExpectedOffsetOpt = 
listenerContext.nextExpectedOffset();
+            if (!nextExpectedOffsetOpt.isPresent()) {
+                return;
+            }
+
+            long nextExpectedOffset = nextExpectedOffsetOpt.getAsLong();
+            if (nextExpectedOffset == baseOffset) {
+                listenerContext.fireHandleCommit(baseOffset, epoch, records);
+            }
+        }
+    }
+
+    private void maybeFireHandleClaim(LeaderState state) {
+        for (ListenerContext listenerContext : listenerContexts) {
+            int leaderEpoch = state.epoch();
+
+            // We can fire `handleClaim` as soon as the listener has caught
+            // up to the start of the leader epoch. This guarantees that the
+            // state machine has seen the full committed state before it 
becomes
+            // leader and begins writing to the log.

Review comment:
       I thought a little about it. Right now the state machine has just two 
states: 1) i am not a leader, and 2) i am a leader and have caught up with all 
committed data from previous epochs.  An alternative design is to fire 
`handleClaim` immediately and provide the starting offset of the leader epoch. 
Then the controller can wait until its state machine has caught up to that 
offset before starting to write data. In the end, I decided not to do it 
because it adds a third state and I did not expect the controller would be able 
to do anything useful in the additional state. The point about heartbeats is 
interesting, but even that seems tricky since the controller would not know if 
a broker had been fenced until it has caught up. I think the only thing the 
controller could do is hold the requests in purgatory, which might be better 
than letting them retry, but not sure it's worth it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to