jsancio commented on a change in pull request #9482:
URL: https://github.com/apache/kafka/pull/9482#discussion_r510385486



##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -206,30 +234,77 @@ private void updateLeaderEndOffsetAndTimestamp(
         final LogOffsetMetadata endOffsetMetadata = log.endOffset();
 
         if (state.updateLocalState(currentTimeMs, endOffsetMetadata)) {
-            updateHighWatermark(state, currentTimeMs);
+            onUpdateLeaderHighWatermark(state, currentTimeMs);
         }
 
-        LogOffset endOffset = new LogOffset(endOffsetMetadata.offset, 
Isolation.UNCOMMITTED);
-        fetchPurgatory.maybeComplete(endOffset, currentTimeMs);
+        fetchPurgatory.maybeComplete(endOffsetMetadata.offset, currentTimeMs);
     }
 
-    private void updateHighWatermark(
-        EpochState state,
+    private void onUpdateLeaderHighWatermark(
+        LeaderState state,
         long currentTimeMs
     ) {
         state.highWatermark().ifPresent(highWatermark -> {
-            logger.debug("High watermark updated to {}", highWatermark);
+            logger.debug("Leader high watermark updated to {}", highWatermark);
             log.updateHighWatermark(highWatermark);
-
-            LogOffset offset = new LogOffset(highWatermark.offset, 
Isolation.COMMITTED);
-            appendPurgatory.maybeComplete(offset, currentTimeMs);
-            fetchPurgatory.maybeComplete(offset, currentTimeMs);
+            appendPurgatory.maybeComplete(highWatermark.offset, currentTimeMs);
+            maybeFireHandleCommit(highWatermark.offset);
         });
     }
 
-    @Override
-    public LeaderAndEpoch currentLeaderAndEpoch() {
-        return quorum.leaderAndEpoch();
+    private void maybeFireHandleCommit(long highWatermark) {
+        maybeFireHandleCommit(listenerContexts, highWatermark);
+    }
+
+    private void maybeFireHandleCommit(List<ListenerContext> listenerContexts, 
long highWatermark) {
+        // TODO: When there are multiple listeners, we can cache reads to save 
some work

Review comment:
       I would remove this comment. We ca file an issue and fix it if this 
becomes a performance issue.

##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -206,30 +234,77 @@ private void updateLeaderEndOffsetAndTimestamp(
         final LogOffsetMetadata endOffsetMetadata = log.endOffset();
 
         if (state.updateLocalState(currentTimeMs, endOffsetMetadata)) {
-            updateHighWatermark(state, currentTimeMs);
+            onUpdateLeaderHighWatermark(state, currentTimeMs);
         }
 
-        LogOffset endOffset = new LogOffset(endOffsetMetadata.offset, 
Isolation.UNCOMMITTED);
-        fetchPurgatory.maybeComplete(endOffset, currentTimeMs);
+        fetchPurgatory.maybeComplete(endOffsetMetadata.offset, currentTimeMs);
     }
 
-    private void updateHighWatermark(
-        EpochState state,
+    private void onUpdateLeaderHighWatermark(
+        LeaderState state,
         long currentTimeMs
     ) {
         state.highWatermark().ifPresent(highWatermark -> {
-            logger.debug("High watermark updated to {}", highWatermark);
+            logger.debug("Leader high watermark updated to {}", highWatermark);
             log.updateHighWatermark(highWatermark);
-
-            LogOffset offset = new LogOffset(highWatermark.offset, 
Isolation.COMMITTED);
-            appendPurgatory.maybeComplete(offset, currentTimeMs);
-            fetchPurgatory.maybeComplete(offset, currentTimeMs);
+            appendPurgatory.maybeComplete(highWatermark.offset, currentTimeMs);
+            maybeFireHandleCommit(highWatermark.offset);
         });
     }
 
-    @Override
-    public LeaderAndEpoch currentLeaderAndEpoch() {
-        return quorum.leaderAndEpoch();
+    private void maybeFireHandleCommit(long highWatermark) {
+        maybeFireHandleCommit(listenerContexts, highWatermark);
+    }
+
+    private void maybeFireHandleCommit(List<ListenerContext> listenerContexts, 
long highWatermark) {
+        // TODO: When there are multiple listeners, we can cache reads to save 
some work
+        for (ListenerContext listenerContext : listenerContexts) {
+            OptionalLong nextExpectedOffsetOpt = 
listenerContext.nextExpectedOffset();
+            if (!nextExpectedOffsetOpt.isPresent()) {
+                return;

Review comment:
       Why a `return`? Did you mean to use `continue`? If this is suppose to be 
a `continue` then maybe we can `Optional.ifPresent`.
   
   Same comment for one of the other overloaded `maybeFireHandleCommit`.

##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -975,12 +1029,9 @@ private boolean handleFetchResponse(
                 
log.truncateToEndOffset(divergingOffsetAndEpoch).ifPresent(truncationOffset -> {
                     logger.info("Truncated to offset {} from Fetch response 
from leader {}",
                         truncationOffset, quorum.leaderIdOrNil());
-
-                    // Since the end offset has been updated, we should 
complete any delayed
-                    // reads at the end offset.
-                    fetchPurgatory.maybeComplete(
-                        new LogOffset(Long.MAX_VALUE, Isolation.UNCOMMITTED),
-                        currentTimeMs);
+                    // After truncation, we complete all pending reads in 
order to
+                    // ensure that fetches account for the

Review comment:
       Incomplete sentence.

##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -206,30 +234,77 @@ private void updateLeaderEndOffsetAndTimestamp(
         final LogOffsetMetadata endOffsetMetadata = log.endOffset();
 
         if (state.updateLocalState(currentTimeMs, endOffsetMetadata)) {
-            updateHighWatermark(state, currentTimeMs);
+            onUpdateLeaderHighWatermark(state, currentTimeMs);
         }
 
-        LogOffset endOffset = new LogOffset(endOffsetMetadata.offset, 
Isolation.UNCOMMITTED);
-        fetchPurgatory.maybeComplete(endOffset, currentTimeMs);
+        fetchPurgatory.maybeComplete(endOffsetMetadata.offset, currentTimeMs);
     }
 
-    private void updateHighWatermark(
-        EpochState state,
+    private void onUpdateLeaderHighWatermark(
+        LeaderState state,
         long currentTimeMs
     ) {
         state.highWatermark().ifPresent(highWatermark -> {
-            logger.debug("High watermark updated to {}", highWatermark);
+            logger.debug("Leader high watermark updated to {}", highWatermark);
             log.updateHighWatermark(highWatermark);
-
-            LogOffset offset = new LogOffset(highWatermark.offset, 
Isolation.COMMITTED);
-            appendPurgatory.maybeComplete(offset, currentTimeMs);
-            fetchPurgatory.maybeComplete(offset, currentTimeMs);
+            appendPurgatory.maybeComplete(highWatermark.offset, currentTimeMs);
+            maybeFireHandleCommit(highWatermark.offset);
         });
     }
 
-    @Override
-    public LeaderAndEpoch currentLeaderAndEpoch() {
-        return quorum.leaderAndEpoch();
+    private void maybeFireHandleCommit(long highWatermark) {
+        maybeFireHandleCommit(listenerContexts, highWatermark);
+    }
+
+    private void maybeFireHandleCommit(List<ListenerContext> listenerContexts, 
long highWatermark) {
+        // TODO: When there are multiple listeners, we can cache reads to save 
some work
+        for (ListenerContext listenerContext : listenerContexts) {
+            OptionalLong nextExpectedOffsetOpt = 
listenerContext.nextExpectedOffset();
+            if (!nextExpectedOffsetOpt.isPresent()) {
+                return;
+            }
+
+            long nextExpectedOffset = nextExpectedOffsetOpt.getAsLong();
+            if (nextExpectedOffset < highWatermark) {
+                LogFetchInfo readInfo = log.read(nextExpectedOffset, 
Isolation.COMMITTED);
+                listenerContext.fireHandleCommit(nextExpectedOffset, 
readInfo.records);
+            }
+        }
+    }
+
+    private void maybeFireHandleCommit(long baseOffset, int epoch, List<T> 
records) {
+        for (ListenerContext listenerContext : listenerContexts) {
+            OptionalLong nextExpectedOffsetOpt = 
listenerContext.nextExpectedOffset();
+            if (!nextExpectedOffsetOpt.isPresent()) {
+                return;
+            }
+
+            long nextExpectedOffset = nextExpectedOffsetOpt.getAsLong();
+            if (nextExpectedOffset == baseOffset) {
+                listenerContext.fireHandleCommit(baseOffset, epoch, records);
+            }
+        }
+    }
+
+    private void maybeFireHandleClaim(LeaderState state) {
+        for (ListenerContext listenerContext : listenerContexts) {
+            int leaderEpoch = state.epoch();
+
+            // We can fire `handleClaim` as soon as the listener has caught
+            // up to the start of the leader epoch. This guarantees that the
+            // state machine has seen the full committed state before it 
becomes
+            // leader and begins writing to the log.

Review comment:
       Interesting. It is good to hide this logic from the state machine. 
Looking at the epoch and not at the LEO is okay because at this point we 
guarantee that the only records with that epoch are control records (e.g. 
LeaderChangedMessage).
   
   I am wondering if the state machine may want to know this before it can 
process state machine requests. Maybe this is okay because the brokers/replicas 
will learn about the new leader through the `Fetch` and `BeginQuorum` protocol 
and not from the state machine (Kafka Controller) itself.
   
   It is possible that the leader will receive Kafka Controller message from 
replicas/broker before it knows that it is leader. Most likely the Kafka 
Controller will reject them but the replicas/brokers need to keep retrying. 
This is specially important for heartbeat messages.

##########
File path: raft/src/main/java/org/apache/kafka/raft/RaftClient.java
##########
@@ -16,57 +16,75 @@
  */
 package org.apache.kafka.raft;
 
-import org.apache.kafka.common.record.Records;
-
 import java.io.IOException;
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
 
-public interface RaftClient {
+public interface RaftClient<T> {
+
+    interface Listener<T> {
+        /**
+         * Callback which is invoked for all records committed to the log.
+         * It is the responsibility of the caller to invoke {@link 
BatchReader#close()}
+         * after consuming the reader.
+         *
+         * Note that there is not a one-to-one correspondence between writes 
through
+         * {@link #scheduleAppend(int, List)} and this callback. The Raft 
implementation
+         * is free to batch together the records from multiple append calls 
provided
+         * that batch boundaries are respected. This means that each batch 
specified
+         * through {@link #scheduleAppend(int, List)} is guaranteed to be a 
subset of
+         * a batch provided by the {@link BatchReader}.
+         *
+         * @param reader reader instance which must be iterated
+         */
+        void handleCommit(BatchReader<T> reader);
+
+        /**
+         * Invoked after this node has become a leader. This is only called 
after
+         * all commits up to the start of the leader's epoch have been sent to
+         * {@link #handleCommit(BatchReader)}.
+         *
+         * After becoming a leader, the client is eligible to write to the log
+         * using {@link #scheduleAppend(int, List)}.
+         *
+         * @param epoch the claimed leader epoch
+         */
+        default void handleClaim(int epoch) {}
+
+        /**
+         * Invoked after a leader has stepped down. This callback may or may 
not
+         * fire before the next leader has been elected.
+         */
+        default void handleResign() {}
+    }
 
     /**
-     * Initialize the client. This should only be called once and it must be
-     * called before any of the other APIs can be invoked.
+     * Initialize the client. This should only be called once on startup.
      *
      * @throws IOException For any IO errors during initialization
      */
     void initialize() throws IOException;
 
     /**
-     * Append a new entry to the log. The client must be in the leader state to
-     * accept an append: it is up to the state machine implementation
-     * to ensure this using {@link #currentLeaderAndEpoch()}.
-     *
-     * TODO: One improvement we can make here is to allow the caller to specify
-     * the current leader epoch in the record set. That would ensure that each
-     * leader change must be "observed" by the state machine before new appends
-     * are accepted.
-     *
-     * @param records The records to append to the log
-     * @param timeoutMs Maximum time to wait for the append to complete
-     * @return A future containing the last offset and epoch of the appended 
records (if successful)
-     */
-    CompletableFuture<OffsetAndEpoch> append(Records records, AckMode ackMode, 
long timeoutMs);
-
-    /**
-     * Read a set of records from the log. Note that it is the responsibility 
of the state machine
-     * to filter control records added by the Raft client itself.
-     *
-     * If the fetch offset is no longer valid, then the future will be 
completed exceptionally
-     * with a {@link LogTruncationException}.
+     * Register a listener to get commit/leader notifications.
      *
-     * @param position The position to fetch from
-     * @param isolation The isolation level to apply to the read
-     * @param maxWaitTimeMs The maximum time to wait for new data to become 
available before completion
-     * @return The record set, which may be empty if fetching from the end of 
the log
+     * @param listener the listener
      */
-    CompletableFuture<Records> read(OffsetAndEpoch position, Isolation 
isolation, long maxWaitTimeMs);
+    void register(Listener<T> listener);
 
     /**
-     * Get the current leader (if known) and the current epoch.
+     * Append a list of records to the log. The write will be scheduled for 
some time
+     * in the future. There is no guarantee that appended records will be 
written to
+     * the log and eventually committed. However, it is guaranteed that if any 
of the
+     * records become committed, then all of them will be.
      *
-     * @return Current leader and epoch information
+     * @param epoch the current leader epoch
+     * @param records the list of records to append
+     * @return the offset within the current epoch that the log entries will 
be appended,
+     *         or null if the leader was unable to accept the write (e.g. due 
to memory
+     *         being reached).

Review comment:
       We should also mention that `MAX_VALUE` is return if the RaftClient is 
not the leader.

##########
File path: raft/src/main/java/org/apache/kafka/raft/QuorumState.java
##########
@@ -369,6 +379,17 @@ public void transitionToLeader(long epochStartOffset) 
throws IOException {
         if (!candidateState.isVoteGranted())
             throw new IllegalStateException("Cannot become leader without 
majority votes granted");
 
+        // Note that the leader does not retain the high watermark that was 
known
+        // in the previous state. The purpose of this is to protect the 
monotonicity

Review comment:
       What is "this" in this sentence? epochStartOffset?

##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -1757,35 +1809,86 @@ public void complete() {
         }
     }
 
-    private static class UnwrittenAppend {
-        private final Records records;
-        private final long createTimeMs;
-        private final long requestTimeoutMs;
-        private final AckMode ackMode;
-        private final CompletableFuture<OffsetAndEpoch> future;
+    private final class ListenerContext implements 
CloseListener<BatchReader<T>> {
+        private final RaftClient.Listener<T> listener;
+        private BatchReader<T> lastSent = null;
+        private long lastAckedOffset = 0;
+        private int claimedEpoch = 0;
+
+        private ListenerContext(Listener<T> listener) {
+            this.listener = listener;
+        }
+
+        /**
+         * Get the last acked offset, which is one greater than the offset of 
the
+         * last record which was acked by the state machine.
+         */
+        public synchronized long lastAckedOffset() {
+            return lastAckedOffset;
+        }
+
+        /**
+         * Get the next expected offset, which might be larger than the last 
acked
+         * offset if there are inflight batches which have not been acked yet.
+         * Note that when fetching from disk, we may not know the last offset 
of
+         * inflight data until it has been processed by the state machine. In 
this case,
+         * we delay sending additional data until the state machine has read 
to the
+         * end and the last offset is determined.

Review comment:
       Does this mean that in practice, follower will have at most two batches 
in flight?
   1. The one that they are currently processing
   2. If they read the last message/record in the batch then the next batch in 
the log?

##########
File path: raft/src/main/java/org/apache/kafka/raft/RaftClient.java
##########
@@ -16,57 +16,75 @@
  */
 package org.apache.kafka.raft;
 
-import org.apache.kafka.common.record.Records;
-
 import java.io.IOException;
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
 
-public interface RaftClient {
+public interface RaftClient<T> {
+
+    interface Listener<T> {
+        /**
+         * Callback which is invoked for all records committed to the log.
+         * It is the responsibility of the caller to invoke {@link 
BatchReader#close()}
+         * after consuming the reader.
+         *
+         * Note that there is not a one-to-one correspondence between writes 
through
+         * {@link #scheduleAppend(int, List)} and this callback. The Raft 
implementation
+         * is free to batch together the records from multiple append calls 
provided
+         * that batch boundaries are respected. This means that each batch 
specified
+         * through {@link #scheduleAppend(int, List)} is guaranteed to be a 
subset of
+         * a batch provided by the {@link BatchReader}.
+         *
+         * @param reader reader instance which must be iterated

Review comment:
       nit: "... must be iterated and closed".

##########
File path: raft/src/main/java/org/apache/kafka/raft/RecordSerde.java
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft;
+
+import org.apache.kafka.common.protocol.DataOutputStreamWritable;
+import org.apache.kafka.common.protocol.Readable;
+
+public interface RecordSerde<T> {
+    /**
+     * Create a new context object for to be used when serializing a batch of 
records.
+     * This allows for state to be shared between {@link #recordSize(Object, 
Object)}
+     * and {@link #write(Object, Object, DataOutputStreamWritable)}, which is 
useful
+     * in order to avoid redundant work (see e.g.
+     * {@link org.apache.kafka.common.protocol.ObjectSerializationCache}).
+     *
+     * @return context object or null if none is needed
+     */
+    default Object newWriteContext() {
+        return null;
+    }
+
+    /**
+     * Get the size of a record.
+     *
+     * @param data the record that will be serialized
+     * @param context context object created by {@link #newWriteContext()}
+     * @return the size in bytes of the serialized record
+     */
+    int recordSize(T data, Object context);
+
+
+    /**
+     * Write the record to the output stream.
+     *
+     * @param data the record to serialize and write
+     * @param context context object created by {@link #newWriteContext()}
+     * @param out the output stream to write the record to
+     */
+    void write(T data, Object context, DataOutputStreamWritable out);
+
+    /**
+     *
+     * @param input
+     * @param size
+     * @return
+     */

Review comment:
       TODO: missing comments.

##########
File path: raft/src/main/java/org/apache/kafka/raft/RaftClient.java
##########
@@ -16,57 +16,75 @@
  */
 package org.apache.kafka.raft;
 
-import org.apache.kafka.common.record.Records;
-
 import java.io.IOException;
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
 
-public interface RaftClient {
+public interface RaftClient<T> {
+
+    interface Listener<T> {
+        /**
+         * Callback which is invoked for all records committed to the log.
+         * It is the responsibility of the caller to invoke {@link 
BatchReader#close()}
+         * after consuming the reader.
+         *
+         * Note that there is not a one-to-one correspondence between writes 
through
+         * {@link #scheduleAppend(int, List)} and this callback. The Raft 
implementation
+         * is free to batch together the records from multiple append calls 
provided
+         * that batch boundaries are respected. This means that each batch 
specified
+         * through {@link #scheduleAppend(int, List)} is guaranteed to be a 
subset of
+         * a batch provided by the {@link BatchReader}.
+         *
+         * @param reader reader instance which must be iterated
+         */
+        void handleCommit(BatchReader<T> reader);
+
+        /**
+         * Invoked after this node has become a leader. This is only called 
after
+         * all commits up to the start of the leader's epoch have been sent to
+         * {@link #handleCommit(BatchReader)}.
+         *
+         * After becoming a leader, the client is eligible to write to the log
+         * using {@link #scheduleAppend(int, List)}.
+         *
+         * @param epoch the claimed leader epoch
+         */
+        default void handleClaim(int epoch) {}
+
+        /**
+         * Invoked after a leader has stepped down. This callback may or may 
not
+         * fire before the next leader has been elected.
+         */
+        default void handleResign() {}
+    }
 
     /**
-     * Initialize the client. This should only be called once and it must be
-     * called before any of the other APIs can be invoked.
+     * Initialize the client. This should only be called once on startup.
      *
      * @throws IOException For any IO errors during initialization
      */
     void initialize() throws IOException;
 
     /**
-     * Append a new entry to the log. The client must be in the leader state to
-     * accept an append: it is up to the state machine implementation
-     * to ensure this using {@link #currentLeaderAndEpoch()}.
-     *
-     * TODO: One improvement we can make here is to allow the caller to specify
-     * the current leader epoch in the record set. That would ensure that each
-     * leader change must be "observed" by the state machine before new appends
-     * are accepted.
-     *
-     * @param records The records to append to the log
-     * @param timeoutMs Maximum time to wait for the append to complete
-     * @return A future containing the last offset and epoch of the appended 
records (if successful)
-     */
-    CompletableFuture<OffsetAndEpoch> append(Records records, AckMode ackMode, 
long timeoutMs);
-
-    /**
-     * Read a set of records from the log. Note that it is the responsibility 
of the state machine
-     * to filter control records added by the Raft client itself.
-     *
-     * If the fetch offset is no longer valid, then the future will be 
completed exceptionally
-     * with a {@link LogTruncationException}.
+     * Register a listener to get commit/leader notifications.
      *
-     * @param position The position to fetch from
-     * @param isolation The isolation level to apply to the read
-     * @param maxWaitTimeMs The maximum time to wait for new data to become 
available before completion
-     * @return The record set, which may be empty if fetching from the end of 
the log
+     * @param listener the listener
      */
-    CompletableFuture<Records> read(OffsetAndEpoch position, Isolation 
isolation, long maxWaitTimeMs);
+    void register(Listener<T> listener);
 
     /**
-     * Get the current leader (if known) and the current epoch.
+     * Append a list of records to the log. The write will be scheduled for 
some time
+     * in the future. There is no guarantee that appended records will be 
written to
+     * the log and eventually committed. However, it is guaranteed that if any 
of the
+     * records become committed, then all of them will be.
      *
-     * @return Current leader and epoch information
+     * @param epoch the current leader epoch
+     * @param records the list of records to append
+     * @return the offset within the current epoch that the log entries will 
be appended,
+     *         or null if the leader was unable to accept the write (e.g. due 
to memory
+     *         being reached).
      */
-    LeaderAndEpoch currentLeaderAndEpoch();
+    Long scheduleAppend(int epoch, List<T> records);

Review comment:
       I think it is okay as the fix may be non-trivial but technically 
`scheduleAppend` will accept records even if no `Listener` has received a 
`handleClaim`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to