hachikuji commented on a change in pull request #9482:
URL: https://github.com/apache/kafka/pull/9482#discussion_r510432303



##########
File path: raft/src/main/java/org/apache/kafka/raft/RaftClient.java
##########
@@ -16,57 +16,75 @@
  */
 package org.apache.kafka.raft;
 
-import org.apache.kafka.common.record.Records;
-
 import java.io.IOException;
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
 
-public interface RaftClient {
+public interface RaftClient<T> {
+
+    interface Listener<T> {
+        /**
+         * Callback which is invoked for all records committed to the log.
+         * It is the responsibility of the caller to invoke {@link 
BatchReader#close()}
+         * after consuming the reader.
+         *
+         * Note that there is not a one-to-one correspondence between writes 
through
+         * {@link #scheduleAppend(int, List)} and this callback. The Raft 
implementation
+         * is free to batch together the records from multiple append calls 
provided
+         * that batch boundaries are respected. This means that each batch 
specified
+         * through {@link #scheduleAppend(int, List)} is guaranteed to be a 
subset of
+         * a batch provided by the {@link BatchReader}.
+         *
+         * @param reader reader instance which must be iterated
+         */
+        void handleCommit(BatchReader<T> reader);
+
+        /**
+         * Invoked after this node has become a leader. This is only called 
after
+         * all commits up to the start of the leader's epoch have been sent to
+         * {@link #handleCommit(BatchReader)}.
+         *
+         * After becoming a leader, the client is eligible to write to the log
+         * using {@link #scheduleAppend(int, List)}.
+         *
+         * @param epoch the claimed leader epoch
+         */
+        default void handleClaim(int epoch) {}
+
+        /**
+         * Invoked after a leader has stepped down. This callback may or may 
not
+         * fire before the next leader has been elected.
+         */
+        default void handleResign() {}
+    }
 
     /**
-     * Initialize the client. This should only be called once and it must be
-     * called before any of the other APIs can be invoked.
+     * Initialize the client. This should only be called once on startup.
      *
      * @throws IOException For any IO errors during initialization
      */
     void initialize() throws IOException;
 
     /**
-     * Append a new entry to the log. The client must be in the leader state to
-     * accept an append: it is up to the state machine implementation
-     * to ensure this using {@link #currentLeaderAndEpoch()}.
-     *
-     * TODO: One improvement we can make here is to allow the caller to specify
-     * the current leader epoch in the record set. That would ensure that each
-     * leader change must be "observed" by the state machine before new appends
-     * are accepted.
-     *
-     * @param records The records to append to the log
-     * @param timeoutMs Maximum time to wait for the append to complete
-     * @return A future containing the last offset and epoch of the appended 
records (if successful)
-     */
-    CompletableFuture<OffsetAndEpoch> append(Records records, AckMode ackMode, 
long timeoutMs);
-
-    /**
-     * Read a set of records from the log. Note that it is the responsibility 
of the state machine
-     * to filter control records added by the Raft client itself.
-     *
-     * If the fetch offset is no longer valid, then the future will be 
completed exceptionally
-     * with a {@link LogTruncationException}.
+     * Register a listener to get commit/leader notifications.
      *
-     * @param position The position to fetch from
-     * @param isolation The isolation level to apply to the read
-     * @param maxWaitTimeMs The maximum time to wait for new data to become 
available before completion
-     * @return The record set, which may be empty if fetching from the end of 
the log
+     * @param listener the listener
      */
-    CompletableFuture<Records> read(OffsetAndEpoch position, Isolation 
isolation, long maxWaitTimeMs);
+    void register(Listener<T> listener);
 
     /**
-     * Get the current leader (if known) and the current epoch.
+     * Append a list of records to the log. The write will be scheduled for 
some time
+     * in the future. There is no guarantee that appended records will be 
written to
+     * the log and eventually committed. However, it is guaranteed that if any 
of the
+     * records become committed, then all of them will be.
      *
-     * @return Current leader and epoch information
+     * @param epoch the current leader epoch
+     * @param records the list of records to append
+     * @return the offset within the current epoch that the log entries will 
be appended,
+     *         or null if the leader was unable to accept the write (e.g. due 
to memory
+     *         being reached).
      */
-    LeaderAndEpoch currentLeaderAndEpoch();
+    Long scheduleAppend(int epoch, List<T> records);

Review comment:
       Yeah, I don't see a strong need to be too strict about this for now.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to