jsancio commented on a change in pull request #9482: URL: https://github.com/apache/kafka/pull/9482#discussion_r510385486
########## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ########## @@ -206,30 +234,77 @@ private void updateLeaderEndOffsetAndTimestamp( final LogOffsetMetadata endOffsetMetadata = log.endOffset(); if (state.updateLocalState(currentTimeMs, endOffsetMetadata)) { - updateHighWatermark(state, currentTimeMs); + onUpdateLeaderHighWatermark(state, currentTimeMs); } - LogOffset endOffset = new LogOffset(endOffsetMetadata.offset, Isolation.UNCOMMITTED); - fetchPurgatory.maybeComplete(endOffset, currentTimeMs); + fetchPurgatory.maybeComplete(endOffsetMetadata.offset, currentTimeMs); } - private void updateHighWatermark( - EpochState state, + private void onUpdateLeaderHighWatermark( + LeaderState state, long currentTimeMs ) { state.highWatermark().ifPresent(highWatermark -> { - logger.debug("High watermark updated to {}", highWatermark); + logger.debug("Leader high watermark updated to {}", highWatermark); log.updateHighWatermark(highWatermark); - - LogOffset offset = new LogOffset(highWatermark.offset, Isolation.COMMITTED); - appendPurgatory.maybeComplete(offset, currentTimeMs); - fetchPurgatory.maybeComplete(offset, currentTimeMs); + appendPurgatory.maybeComplete(highWatermark.offset, currentTimeMs); + maybeFireHandleCommit(highWatermark.offset); }); } - @Override - public LeaderAndEpoch currentLeaderAndEpoch() { - return quorum.leaderAndEpoch(); + private void maybeFireHandleCommit(long highWatermark) { + maybeFireHandleCommit(listenerContexts, highWatermark); + } + + private void maybeFireHandleCommit(List<ListenerContext> listenerContexts, long highWatermark) { + // TODO: When there are multiple listeners, we can cache reads to save some work Review comment: I would remove this comment. We ca file an issue and fix it if this becomes a performance issue. ########## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ########## @@ -206,30 +234,77 @@ private void updateLeaderEndOffsetAndTimestamp( final LogOffsetMetadata endOffsetMetadata = log.endOffset(); if (state.updateLocalState(currentTimeMs, endOffsetMetadata)) { - updateHighWatermark(state, currentTimeMs); + onUpdateLeaderHighWatermark(state, currentTimeMs); } - LogOffset endOffset = new LogOffset(endOffsetMetadata.offset, Isolation.UNCOMMITTED); - fetchPurgatory.maybeComplete(endOffset, currentTimeMs); + fetchPurgatory.maybeComplete(endOffsetMetadata.offset, currentTimeMs); } - private void updateHighWatermark( - EpochState state, + private void onUpdateLeaderHighWatermark( + LeaderState state, long currentTimeMs ) { state.highWatermark().ifPresent(highWatermark -> { - logger.debug("High watermark updated to {}", highWatermark); + logger.debug("Leader high watermark updated to {}", highWatermark); log.updateHighWatermark(highWatermark); - - LogOffset offset = new LogOffset(highWatermark.offset, Isolation.COMMITTED); - appendPurgatory.maybeComplete(offset, currentTimeMs); - fetchPurgatory.maybeComplete(offset, currentTimeMs); + appendPurgatory.maybeComplete(highWatermark.offset, currentTimeMs); + maybeFireHandleCommit(highWatermark.offset); }); } - @Override - public LeaderAndEpoch currentLeaderAndEpoch() { - return quorum.leaderAndEpoch(); + private void maybeFireHandleCommit(long highWatermark) { + maybeFireHandleCommit(listenerContexts, highWatermark); + } + + private void maybeFireHandleCommit(List<ListenerContext> listenerContexts, long highWatermark) { + // TODO: When there are multiple listeners, we can cache reads to save some work + for (ListenerContext listenerContext : listenerContexts) { + OptionalLong nextExpectedOffsetOpt = listenerContext.nextExpectedOffset(); + if (!nextExpectedOffsetOpt.isPresent()) { + return; Review comment: Why a `return`? Did you mean to use `continue`? If this is suppose to be a `continue` then maybe we can `Optional.ifPresent`. Same comment for one of the other overloaded `maybeFireHandleCommit`. ########## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ########## @@ -975,12 +1029,9 @@ private boolean handleFetchResponse( log.truncateToEndOffset(divergingOffsetAndEpoch).ifPresent(truncationOffset -> { logger.info("Truncated to offset {} from Fetch response from leader {}", truncationOffset, quorum.leaderIdOrNil()); - - // Since the end offset has been updated, we should complete any delayed - // reads at the end offset. - fetchPurgatory.maybeComplete( - new LogOffset(Long.MAX_VALUE, Isolation.UNCOMMITTED), - currentTimeMs); + // After truncation, we complete all pending reads in order to + // ensure that fetches account for the Review comment: Incomplete sentence. ########## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ########## @@ -206,30 +234,77 @@ private void updateLeaderEndOffsetAndTimestamp( final LogOffsetMetadata endOffsetMetadata = log.endOffset(); if (state.updateLocalState(currentTimeMs, endOffsetMetadata)) { - updateHighWatermark(state, currentTimeMs); + onUpdateLeaderHighWatermark(state, currentTimeMs); } - LogOffset endOffset = new LogOffset(endOffsetMetadata.offset, Isolation.UNCOMMITTED); - fetchPurgatory.maybeComplete(endOffset, currentTimeMs); + fetchPurgatory.maybeComplete(endOffsetMetadata.offset, currentTimeMs); } - private void updateHighWatermark( - EpochState state, + private void onUpdateLeaderHighWatermark( + LeaderState state, long currentTimeMs ) { state.highWatermark().ifPresent(highWatermark -> { - logger.debug("High watermark updated to {}", highWatermark); + logger.debug("Leader high watermark updated to {}", highWatermark); log.updateHighWatermark(highWatermark); - - LogOffset offset = new LogOffset(highWatermark.offset, Isolation.COMMITTED); - appendPurgatory.maybeComplete(offset, currentTimeMs); - fetchPurgatory.maybeComplete(offset, currentTimeMs); + appendPurgatory.maybeComplete(highWatermark.offset, currentTimeMs); + maybeFireHandleCommit(highWatermark.offset); }); } - @Override - public LeaderAndEpoch currentLeaderAndEpoch() { - return quorum.leaderAndEpoch(); + private void maybeFireHandleCommit(long highWatermark) { + maybeFireHandleCommit(listenerContexts, highWatermark); + } + + private void maybeFireHandleCommit(List<ListenerContext> listenerContexts, long highWatermark) { + // TODO: When there are multiple listeners, we can cache reads to save some work + for (ListenerContext listenerContext : listenerContexts) { + OptionalLong nextExpectedOffsetOpt = listenerContext.nextExpectedOffset(); + if (!nextExpectedOffsetOpt.isPresent()) { + return; + } + + long nextExpectedOffset = nextExpectedOffsetOpt.getAsLong(); + if (nextExpectedOffset < highWatermark) { + LogFetchInfo readInfo = log.read(nextExpectedOffset, Isolation.COMMITTED); + listenerContext.fireHandleCommit(nextExpectedOffset, readInfo.records); + } + } + } + + private void maybeFireHandleCommit(long baseOffset, int epoch, List<T> records) { + for (ListenerContext listenerContext : listenerContexts) { + OptionalLong nextExpectedOffsetOpt = listenerContext.nextExpectedOffset(); + if (!nextExpectedOffsetOpt.isPresent()) { + return; + } + + long nextExpectedOffset = nextExpectedOffsetOpt.getAsLong(); + if (nextExpectedOffset == baseOffset) { + listenerContext.fireHandleCommit(baseOffset, epoch, records); + } + } + } + + private void maybeFireHandleClaim(LeaderState state) { + for (ListenerContext listenerContext : listenerContexts) { + int leaderEpoch = state.epoch(); + + // We can fire `handleClaim` as soon as the listener has caught + // up to the start of the leader epoch. This guarantees that the + // state machine has seen the full committed state before it becomes + // leader and begins writing to the log. Review comment: Interesting. It is good to hide this logic from the state machine. Looking at the epoch and not at the LEO is okay because at this point we guarantee that the only records with that epoch are control records (e.g. LeaderChangedMessage). I am wondering if the state machine may want to know this before it can process state machine requests. Maybe this is okay because the brokers/replicas will learn about the new leader through the `Fetch` and `BeginQuorum` protocol and not from the state machine (Kafka Controller) itself. It is possible that the leader will receive Kafka Controller message from replicas/broker before it knows that it is leader. Most likely the Kafka Controller will reject them but the replicas/brokers need to keep retrying. This is specially important for heartbeat messages. ########## File path: raft/src/main/java/org/apache/kafka/raft/RaftClient.java ########## @@ -16,57 +16,75 @@ */ package org.apache.kafka.raft; -import org.apache.kafka.common.record.Records; - import java.io.IOException; +import java.util.List; import java.util.concurrent.CompletableFuture; -public interface RaftClient { +public interface RaftClient<T> { + + interface Listener<T> { + /** + * Callback which is invoked for all records committed to the log. + * It is the responsibility of the caller to invoke {@link BatchReader#close()} + * after consuming the reader. + * + * Note that there is not a one-to-one correspondence between writes through + * {@link #scheduleAppend(int, List)} and this callback. The Raft implementation + * is free to batch together the records from multiple append calls provided + * that batch boundaries are respected. This means that each batch specified + * through {@link #scheduleAppend(int, List)} is guaranteed to be a subset of + * a batch provided by the {@link BatchReader}. + * + * @param reader reader instance which must be iterated + */ + void handleCommit(BatchReader<T> reader); + + /** + * Invoked after this node has become a leader. This is only called after + * all commits up to the start of the leader's epoch have been sent to + * {@link #handleCommit(BatchReader)}. + * + * After becoming a leader, the client is eligible to write to the log + * using {@link #scheduleAppend(int, List)}. + * + * @param epoch the claimed leader epoch + */ + default void handleClaim(int epoch) {} + + /** + * Invoked after a leader has stepped down. This callback may or may not + * fire before the next leader has been elected. + */ + default void handleResign() {} + } /** - * Initialize the client. This should only be called once and it must be - * called before any of the other APIs can be invoked. + * Initialize the client. This should only be called once on startup. * * @throws IOException For any IO errors during initialization */ void initialize() throws IOException; /** - * Append a new entry to the log. The client must be in the leader state to - * accept an append: it is up to the state machine implementation - * to ensure this using {@link #currentLeaderAndEpoch()}. - * - * TODO: One improvement we can make here is to allow the caller to specify - * the current leader epoch in the record set. That would ensure that each - * leader change must be "observed" by the state machine before new appends - * are accepted. - * - * @param records The records to append to the log - * @param timeoutMs Maximum time to wait for the append to complete - * @return A future containing the last offset and epoch of the appended records (if successful) - */ - CompletableFuture<OffsetAndEpoch> append(Records records, AckMode ackMode, long timeoutMs); - - /** - * Read a set of records from the log. Note that it is the responsibility of the state machine - * to filter control records added by the Raft client itself. - * - * If the fetch offset is no longer valid, then the future will be completed exceptionally - * with a {@link LogTruncationException}. + * Register a listener to get commit/leader notifications. * - * @param position The position to fetch from - * @param isolation The isolation level to apply to the read - * @param maxWaitTimeMs The maximum time to wait for new data to become available before completion - * @return The record set, which may be empty if fetching from the end of the log + * @param listener the listener */ - CompletableFuture<Records> read(OffsetAndEpoch position, Isolation isolation, long maxWaitTimeMs); + void register(Listener<T> listener); /** - * Get the current leader (if known) and the current epoch. + * Append a list of records to the log. The write will be scheduled for some time + * in the future. There is no guarantee that appended records will be written to + * the log and eventually committed. However, it is guaranteed that if any of the + * records become committed, then all of them will be. * - * @return Current leader and epoch information + * @param epoch the current leader epoch + * @param records the list of records to append + * @return the offset within the current epoch that the log entries will be appended, + * or null if the leader was unable to accept the write (e.g. due to memory + * being reached). Review comment: We should also mention that `MAX_VALUE` is return if the RaftClient is not the leader. ########## File path: raft/src/main/java/org/apache/kafka/raft/QuorumState.java ########## @@ -369,6 +379,17 @@ public void transitionToLeader(long epochStartOffset) throws IOException { if (!candidateState.isVoteGranted()) throw new IllegalStateException("Cannot become leader without majority votes granted"); + // Note that the leader does not retain the high watermark that was known + // in the previous state. The purpose of this is to protect the monotonicity Review comment: What is "this" in this sentence? epochStartOffset? ########## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ########## @@ -1757,35 +1809,86 @@ public void complete() { } } - private static class UnwrittenAppend { - private final Records records; - private final long createTimeMs; - private final long requestTimeoutMs; - private final AckMode ackMode; - private final CompletableFuture<OffsetAndEpoch> future; + private final class ListenerContext implements CloseListener<BatchReader<T>> { + private final RaftClient.Listener<T> listener; + private BatchReader<T> lastSent = null; + private long lastAckedOffset = 0; + private int claimedEpoch = 0; + + private ListenerContext(Listener<T> listener) { + this.listener = listener; + } + + /** + * Get the last acked offset, which is one greater than the offset of the + * last record which was acked by the state machine. + */ + public synchronized long lastAckedOffset() { + return lastAckedOffset; + } + + /** + * Get the next expected offset, which might be larger than the last acked + * offset if there are inflight batches which have not been acked yet. + * Note that when fetching from disk, we may not know the last offset of + * inflight data until it has been processed by the state machine. In this case, + * we delay sending additional data until the state machine has read to the + * end and the last offset is determined. Review comment: Does this mean that in practice, follower will have at most two batches in flight? 1. The one that they are currently processing 2. If they read the last message/record in the batch then the next batch in the log? ########## File path: raft/src/main/java/org/apache/kafka/raft/RaftClient.java ########## @@ -16,57 +16,75 @@ */ package org.apache.kafka.raft; -import org.apache.kafka.common.record.Records; - import java.io.IOException; +import java.util.List; import java.util.concurrent.CompletableFuture; -public interface RaftClient { +public interface RaftClient<T> { + + interface Listener<T> { + /** + * Callback which is invoked for all records committed to the log. + * It is the responsibility of the caller to invoke {@link BatchReader#close()} + * after consuming the reader. + * + * Note that there is not a one-to-one correspondence between writes through + * {@link #scheduleAppend(int, List)} and this callback. The Raft implementation + * is free to batch together the records from multiple append calls provided + * that batch boundaries are respected. This means that each batch specified + * through {@link #scheduleAppend(int, List)} is guaranteed to be a subset of + * a batch provided by the {@link BatchReader}. + * + * @param reader reader instance which must be iterated Review comment: nit: "... must be iterated and closed". ########## File path: raft/src/main/java/org/apache/kafka/raft/RecordSerde.java ########## @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft; + +import org.apache.kafka.common.protocol.DataOutputStreamWritable; +import org.apache.kafka.common.protocol.Readable; + +public interface RecordSerde<T> { + /** + * Create a new context object for to be used when serializing a batch of records. + * This allows for state to be shared between {@link #recordSize(Object, Object)} + * and {@link #write(Object, Object, DataOutputStreamWritable)}, which is useful + * in order to avoid redundant work (see e.g. + * {@link org.apache.kafka.common.protocol.ObjectSerializationCache}). + * + * @return context object or null if none is needed + */ + default Object newWriteContext() { + return null; + } + + /** + * Get the size of a record. + * + * @param data the record that will be serialized + * @param context context object created by {@link #newWriteContext()} + * @return the size in bytes of the serialized record + */ + int recordSize(T data, Object context); + + + /** + * Write the record to the output stream. + * + * @param data the record to serialize and write + * @param context context object created by {@link #newWriteContext()} + * @param out the output stream to write the record to + */ + void write(T data, Object context, DataOutputStreamWritable out); + + /** + * + * @param input + * @param size + * @return + */ Review comment: TODO: missing comments. ########## File path: raft/src/main/java/org/apache/kafka/raft/RaftClient.java ########## @@ -16,57 +16,75 @@ */ package org.apache.kafka.raft; -import org.apache.kafka.common.record.Records; - import java.io.IOException; +import java.util.List; import java.util.concurrent.CompletableFuture; -public interface RaftClient { +public interface RaftClient<T> { + + interface Listener<T> { + /** + * Callback which is invoked for all records committed to the log. + * It is the responsibility of the caller to invoke {@link BatchReader#close()} + * after consuming the reader. + * + * Note that there is not a one-to-one correspondence between writes through + * {@link #scheduleAppend(int, List)} and this callback. The Raft implementation + * is free to batch together the records from multiple append calls provided + * that batch boundaries are respected. This means that each batch specified + * through {@link #scheduleAppend(int, List)} is guaranteed to be a subset of + * a batch provided by the {@link BatchReader}. + * + * @param reader reader instance which must be iterated + */ + void handleCommit(BatchReader<T> reader); + + /** + * Invoked after this node has become a leader. This is only called after + * all commits up to the start of the leader's epoch have been sent to + * {@link #handleCommit(BatchReader)}. + * + * After becoming a leader, the client is eligible to write to the log + * using {@link #scheduleAppend(int, List)}. + * + * @param epoch the claimed leader epoch + */ + default void handleClaim(int epoch) {} + + /** + * Invoked after a leader has stepped down. This callback may or may not + * fire before the next leader has been elected. + */ + default void handleResign() {} + } /** - * Initialize the client. This should only be called once and it must be - * called before any of the other APIs can be invoked. + * Initialize the client. This should only be called once on startup. * * @throws IOException For any IO errors during initialization */ void initialize() throws IOException; /** - * Append a new entry to the log. The client must be in the leader state to - * accept an append: it is up to the state machine implementation - * to ensure this using {@link #currentLeaderAndEpoch()}. - * - * TODO: One improvement we can make here is to allow the caller to specify - * the current leader epoch in the record set. That would ensure that each - * leader change must be "observed" by the state machine before new appends - * are accepted. - * - * @param records The records to append to the log - * @param timeoutMs Maximum time to wait for the append to complete - * @return A future containing the last offset and epoch of the appended records (if successful) - */ - CompletableFuture<OffsetAndEpoch> append(Records records, AckMode ackMode, long timeoutMs); - - /** - * Read a set of records from the log. Note that it is the responsibility of the state machine - * to filter control records added by the Raft client itself. - * - * If the fetch offset is no longer valid, then the future will be completed exceptionally - * with a {@link LogTruncationException}. + * Register a listener to get commit/leader notifications. * - * @param position The position to fetch from - * @param isolation The isolation level to apply to the read - * @param maxWaitTimeMs The maximum time to wait for new data to become available before completion - * @return The record set, which may be empty if fetching from the end of the log + * @param listener the listener */ - CompletableFuture<Records> read(OffsetAndEpoch position, Isolation isolation, long maxWaitTimeMs); + void register(Listener<T> listener); /** - * Get the current leader (if known) and the current epoch. + * Append a list of records to the log. The write will be scheduled for some time + * in the future. There is no guarantee that appended records will be written to + * the log and eventually committed. However, it is guaranteed that if any of the + * records become committed, then all of them will be. * - * @return Current leader and epoch information + * @param epoch the current leader epoch + * @param records the list of records to append + * @return the offset within the current epoch that the log entries will be appended, + * or null if the leader was unable to accept the write (e.g. due to memory + * being reached). */ - LeaderAndEpoch currentLeaderAndEpoch(); + Long scheduleAppend(int epoch, List<T> records); Review comment: I think it is okay as the fix may be non-trivial but technically `scheduleAppend` will accept records even if no `Listener` has received a `handleClaim`. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org