hachikuji commented on a change in pull request #9482: URL: https://github.com/apache/kafka/pull/9482#discussion_r510455713
########## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ########## @@ -206,30 +234,77 @@ private void updateLeaderEndOffsetAndTimestamp( final LogOffsetMetadata endOffsetMetadata = log.endOffset(); if (state.updateLocalState(currentTimeMs, endOffsetMetadata)) { - updateHighWatermark(state, currentTimeMs); + onUpdateLeaderHighWatermark(state, currentTimeMs); } - LogOffset endOffset = new LogOffset(endOffsetMetadata.offset, Isolation.UNCOMMITTED); - fetchPurgatory.maybeComplete(endOffset, currentTimeMs); + fetchPurgatory.maybeComplete(endOffsetMetadata.offset, currentTimeMs); } - private void updateHighWatermark( - EpochState state, + private void onUpdateLeaderHighWatermark( + LeaderState state, long currentTimeMs ) { state.highWatermark().ifPresent(highWatermark -> { - logger.debug("High watermark updated to {}", highWatermark); + logger.debug("Leader high watermark updated to {}", highWatermark); log.updateHighWatermark(highWatermark); - - LogOffset offset = new LogOffset(highWatermark.offset, Isolation.COMMITTED); - appendPurgatory.maybeComplete(offset, currentTimeMs); - fetchPurgatory.maybeComplete(offset, currentTimeMs); + appendPurgatory.maybeComplete(highWatermark.offset, currentTimeMs); + maybeFireHandleCommit(highWatermark.offset); }); } - @Override - public LeaderAndEpoch currentLeaderAndEpoch() { - return quorum.leaderAndEpoch(); + private void maybeFireHandleCommit(long highWatermark) { + maybeFireHandleCommit(listenerContexts, highWatermark); + } + + private void maybeFireHandleCommit(List<ListenerContext> listenerContexts, long highWatermark) { + // TODO: When there are multiple listeners, we can cache reads to save some work + for (ListenerContext listenerContext : listenerContexts) { + OptionalLong nextExpectedOffsetOpt = listenerContext.nextExpectedOffset(); + if (!nextExpectedOffsetOpt.isPresent()) { + return; + } + + long nextExpectedOffset = nextExpectedOffsetOpt.getAsLong(); + if (nextExpectedOffset < highWatermark) { + LogFetchInfo readInfo = log.read(nextExpectedOffset, Isolation.COMMITTED); + listenerContext.fireHandleCommit(nextExpectedOffset, readInfo.records); + } + } + } + + private void maybeFireHandleCommit(long baseOffset, int epoch, List<T> records) { + for (ListenerContext listenerContext : listenerContexts) { + OptionalLong nextExpectedOffsetOpt = listenerContext.nextExpectedOffset(); + if (!nextExpectedOffsetOpt.isPresent()) { + return; + } + + long nextExpectedOffset = nextExpectedOffsetOpt.getAsLong(); + if (nextExpectedOffset == baseOffset) { + listenerContext.fireHandleCommit(baseOffset, epoch, records); + } + } + } + + private void maybeFireHandleClaim(LeaderState state) { + for (ListenerContext listenerContext : listenerContexts) { + int leaderEpoch = state.epoch(); + + // We can fire `handleClaim` as soon as the listener has caught + // up to the start of the leader epoch. This guarantees that the + // state machine has seen the full committed state before it becomes + // leader and begins writing to the log. Review comment: I thought a little about it. Right now the state machine has just two states: 1) i am not a leader, and 2) i am a leader and have caught up with all committed data from previous epochs. An alternative design is to fire `handleClaim` immediately and provide the starting offset of the leader epoch. Then the controller can wait until its state machine has caught up to that offset before starting to write data. In the end, I decided not to do it because it adds a third state and I did not expect the controller would be able to do anything useful in the additional state. The point about heartbeats is interesting, but even that seems tricky since the controller would not know if a broker had been fenced until it has caught up. I think the only thing the controller could do is hold the requests in purgatory, which might be better than letting them retry, but not sure it's worth it. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org