jsancio commented on a change in pull request #9482: URL: https://github.com/apache/kafka/pull/9482#discussion_r510469742
########## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ########## @@ -206,30 +234,77 @@ private void updateLeaderEndOffsetAndTimestamp( final LogOffsetMetadata endOffsetMetadata = log.endOffset(); if (state.updateLocalState(currentTimeMs, endOffsetMetadata)) { - updateHighWatermark(state, currentTimeMs); + onUpdateLeaderHighWatermark(state, currentTimeMs); } - LogOffset endOffset = new LogOffset(endOffsetMetadata.offset, Isolation.UNCOMMITTED); - fetchPurgatory.maybeComplete(endOffset, currentTimeMs); + fetchPurgatory.maybeComplete(endOffsetMetadata.offset, currentTimeMs); } - private void updateHighWatermark( - EpochState state, + private void onUpdateLeaderHighWatermark( + LeaderState state, long currentTimeMs ) { state.highWatermark().ifPresent(highWatermark -> { - logger.debug("High watermark updated to {}", highWatermark); + logger.debug("Leader high watermark updated to {}", highWatermark); log.updateHighWatermark(highWatermark); - - LogOffset offset = new LogOffset(highWatermark.offset, Isolation.COMMITTED); - appendPurgatory.maybeComplete(offset, currentTimeMs); - fetchPurgatory.maybeComplete(offset, currentTimeMs); + appendPurgatory.maybeComplete(highWatermark.offset, currentTimeMs); + maybeFireHandleCommit(highWatermark.offset); }); } - @Override - public LeaderAndEpoch currentLeaderAndEpoch() { - return quorum.leaderAndEpoch(); + private void maybeFireHandleCommit(long highWatermark) { + maybeFireHandleCommit(listenerContexts, highWatermark); + } + + private void maybeFireHandleCommit(List<ListenerContext> listenerContexts, long highWatermark) { + // TODO: When there are multiple listeners, we can cache reads to save some work + for (ListenerContext listenerContext : listenerContexts) { + OptionalLong nextExpectedOffsetOpt = listenerContext.nextExpectedOffset(); + if (!nextExpectedOffsetOpt.isPresent()) { + return; + } + + long nextExpectedOffset = nextExpectedOffsetOpt.getAsLong(); + if (nextExpectedOffset < highWatermark) { + LogFetchInfo readInfo = log.read(nextExpectedOffset, Isolation.COMMITTED); + listenerContext.fireHandleCommit(nextExpectedOffset, readInfo.records); + } + } + } + + private void maybeFireHandleCommit(long baseOffset, int epoch, List<T> records) { + for (ListenerContext listenerContext : listenerContexts) { + OptionalLong nextExpectedOffsetOpt = listenerContext.nextExpectedOffset(); + if (!nextExpectedOffsetOpt.isPresent()) { + return; + } + + long nextExpectedOffset = nextExpectedOffsetOpt.getAsLong(); + if (nextExpectedOffset == baseOffset) { + listenerContext.fireHandleCommit(baseOffset, epoch, records); + } + } + } + + private void maybeFireHandleClaim(LeaderState state) { + for (ListenerContext listenerContext : listenerContexts) { + int leaderEpoch = state.epoch(); + + // We can fire `handleClaim` as soon as the listener has caught + // up to the start of the leader epoch. This guarantees that the + // state machine has seen the full committed state before it becomes + // leader and begins writing to the log. Review comment: Yeah. I was thinking of the same thing, "hold the requests in purgatory". But like you said, maybe this optimization is not worth the added complexity. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org