abbccdda commented on a change in pull request #9482:
URL: https://github.com/apache/kafka/pull/9482#discussion_r513200010



##########
File path: 
clients/src/main/java/org/apache/kafka/common/protocol/DataInputStreamReadable.java
##########
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.protocol;
+
+import org.apache.kafka.common.utils.ByteUtils;
+
+import java.io.Closeable;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class DataInputStreamReadable implements Readable, Closeable {
+    protected final DataInputStream input;
+
+    public DataInputStreamReadable(DataInputStream input) {
+        this.input = input;
+    }
+
+    @Override
+    public byte readByte() {
+        try {
+            return input.readByte();
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public short readShort() {
+        try {

Review comment:
       nit: we could refactor out the try-catch logic.

##########
File path: 
clients/src/main/java/org/apache/kafka/common/metrics/stats/Percentiles.java
##########
@@ -113,11 +113,11 @@ protected HistogramSample newSample(long timeMs) {
     protected void update(Sample sample, MetricConfig config, double value, 
long timeMs) {
         final double boundedValue;
         if (value > max) {
-            log.warn("Received value {} which is greater than max recordable 
value {}, will be pinned to the max value",
+            log.debug("Received value {} which is greater than max recordable 
value {}, will be pinned to the max value",

Review comment:
       Why do we change these values to debug?

##########
File path: raft/src/main/java/org/apache/kafka/raft/QuorumState.java
##########
@@ -369,6 +379,17 @@ public void transitionToLeader(long epochStartOffset) 
throws IOException {
         if (!candidateState.isVoteGranted())
             throw new IllegalStateException("Cannot become leader without 
majority votes granted");
 
+        // Note that the leader does not retain the high watermark that was 
known
+        // in the previous state. The reason it does not is to protect the 
monotonicity

Review comment:
       `it does not` could be removed.

##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -228,35 +234,81 @@ private void updateLeaderEndOffsetAndTimestamp(
         final LogOffsetMetadata endOffsetMetadata = log.endOffset();
 
         if (state.updateLocalState(currentTimeMs, endOffsetMetadata)) {
-            updateHighWatermark(state, currentTimeMs);
+            onUpdateLeaderHighWatermark(state, currentTimeMs);
         }
 
-        LogOffset endOffset = new LogOffset(endOffsetMetadata.offset, 
Isolation.UNCOMMITTED);
-        fetchPurgatory.maybeComplete(endOffset, currentTimeMs);
+        fetchPurgatory.maybeComplete(endOffsetMetadata.offset, currentTimeMs);
     }
 
-    private void updateHighWatermark(
-        EpochState state,
+    private void onUpdateLeaderHighWatermark(
+        LeaderState state,
         long currentTimeMs
     ) {
         state.highWatermark().ifPresent(highWatermark -> {
-            logger.debug("High watermark updated to {}", highWatermark);
+            logger.debug("Leader high watermark updated to {}", highWatermark);
             log.updateHighWatermark(highWatermark);
-
-            LogOffset offset = new LogOffset(highWatermark.offset, 
Isolation.COMMITTED);
-            appendPurgatory.maybeComplete(offset, currentTimeMs);
-            fetchPurgatory.maybeComplete(offset, currentTimeMs);
+            appendPurgatory.maybeComplete(highWatermark.offset, currentTimeMs);
+            maybeFireHandleCommit(highWatermark.offset);
         });
     }
 
-    @Override
-    public LeaderAndEpoch currentLeaderAndEpoch() {
-        return quorum.leaderAndEpoch();
+    private void maybeFireHandleCommit(long highWatermark) {
+        maybeFireHandleCommit(listenerContexts, highWatermark);
+    }
+
+    private void maybeFireHandleCommit(List<ListenerContext> listenerContexts, 
long highWatermark) {
+        // TODO: When there are multiple listeners, we can cache reads to save 
some work
+        for (ListenerContext listenerContext : listenerContexts) {
+            OptionalLong nextExpectedOffsetOpt = 
listenerContext.nextExpectedOffset();
+            if (!nextExpectedOffsetOpt.isPresent()) {
+                continue;
+            }
+
+            long nextExpectedOffset = nextExpectedOffsetOpt.getAsLong();
+            if (nextExpectedOffset < highWatermark) {
+                LogFetchInfo readInfo = log.read(nextExpectedOffset, 
Isolation.COMMITTED);
+                listenerContext.fireHandleCommit(nextExpectedOffset, 
readInfo.records);
+            }
+        }
+    }
+
+    private void maybeFireHandleCommit(long baseOffset, int epoch, List<T> 
records) {
+        for (ListenerContext listenerContext : listenerContexts) {
+            OptionalLong nextExpectedOffsetOpt = 
listenerContext.nextExpectedOffset();
+            if (!nextExpectedOffsetOpt.isPresent()) {
+                continue;
+            }
+
+            long nextExpectedOffset = nextExpectedOffsetOpt.getAsLong();
+            if (nextExpectedOffset == baseOffset) {
+                listenerContext.fireHandleCommit(baseOffset, epoch, records);
+            }
+        }
+    }
+
+    private void maybeFireHandleClaim(LeaderState state) {
+        for (ListenerContext listenerContext : listenerContexts) {
+            int leaderEpoch = state.epoch();

Review comment:
       Why do we need to repeatedly get leader epoch?

##########
File path: raft/src/main/java/org/apache/kafka/raft/BatchReader.java
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft;
+
+import java.io.Closeable;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.OptionalLong;
+
+/**
+ * This interface is used to send committed data from the {@link RaftClient}
+ * down to registered {@link RaftClient.Listener} instances.
+ *
+ * The advantage of hiding the consumption of committed batches behind an 
interface
+ * is that it allows us to push blocking operations such as reads from disk 
outside
+ * of the Raft IO thread. This helps to ensure that a slow state machine will 
not
+ * affect replication.
+ *
+ * @param <T> record type (see {@link org.apache.kafka.raft.RecordSerde})
+ */
+public interface BatchReader<T> extends Iterator<BatchReader.Batch<T>>, 
Closeable {
+
+    /**
+     * Get the base offset of the readable batches. Note that this value is a 
constant
+     * which is defined when the {@link BatchReader} instance is constructed. 
It does
+     * not change based on reader progress.
+     *
+     * @return the base offset
+     */
+    long baseOffset();
+
+    /**
+     * Get the last offset of the batch if it is known. When reading from 
disk, we may
+     * not know the last offset of a set of records until it has been read 
from disk.
+     * In this case, the state machine cannot advance to the next committed 
data until
+     * all batches from the {@link BatchReader} instance have been consumed.
+     *
+     * @return optional last offset
+     */
+    OptionalLong lastOffset();
+
+    /**
+     * Close this reader. It is the responsibility of the {@link 
RaftClient.Listener}
+     * to close each reader passed to {@link 
RaftClient.Listener#handleCommit(BatchReader)}.
+     */
+    @Override
+    void close();
+
+    class Batch<T> {
+        private final long baseOffset;
+        private final int epoch;
+        private final List<T> records;
+
+        public Batch(long baseOffset, int epoch, List<T> records) {
+            this.baseOffset = baseOffset;
+            this.epoch = epoch;
+            this.records = records;
+        }
+
+        public long lastOffset() {
+            return baseOffset + records.size() - 1;
+        }
+
+        public long baseOffset() {
+            return baseOffset;
+        }
+
+        public List<T> records() {
+            return records;
+        }
+
+        public int epoch() {
+            return epoch;
+        }
+
+        @Override
+        public String toString() {
+            return "Batch(" +
+                "baseOffset=" + baseOffset +
+                ", epoch=" + epoch +
+                ", records=" + records +
+                ')';
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            Batch<?> batch = (Batch<?>) o;
+            return baseOffset == batch.baseOffset &&
+                epoch == batch.epoch &&
+                Objects.equals(records, batch.records);

Review comment:
       Do we compare the value or the address of records?

##########
File path: raft/src/main/java/org/apache/kafka/raft/BatchReader.java
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft;
+
+import java.io.Closeable;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.OptionalLong;
+
+/**
+ * This interface is used to send committed data from the {@link RaftClient}
+ * down to registered {@link RaftClient.Listener} instances.
+ *
+ * The advantage of hiding the consumption of committed batches behind an 
interface
+ * is that it allows us to push blocking operations such as reads from disk 
outside
+ * of the Raft IO thread. This helps to ensure that a slow state machine will 
not
+ * affect replication.
+ *
+ * @param <T> record type (see {@link org.apache.kafka.raft.RecordSerde})
+ */
+public interface BatchReader<T> extends Iterator<BatchReader.Batch<T>>, 
Closeable {
+
+    /**
+     * Get the base offset of the readable batches. Note that this value is a 
constant
+     * which is defined when the {@link BatchReader} instance is constructed. 
It does
+     * not change based on reader progress.
+     *
+     * @return the base offset
+     */
+    long baseOffset();
+
+    /**
+     * Get the last offset of the batch if it is known. When reading from 
disk, we may
+     * not know the last offset of a set of records until it has been read 
from disk.
+     * In this case, the state machine cannot advance to the next committed 
data until
+     * all batches from the {@link BatchReader} instance have been consumed.
+     *
+     * @return optional last offset
+     */
+    OptionalLong lastOffset();
+
+    /**
+     * Close this reader. It is the responsibility of the {@link 
RaftClient.Listener}
+     * to close each reader passed to {@link 
RaftClient.Listener#handleCommit(BatchReader)}.
+     */
+    @Override
+    void close();
+
+    class Batch<T> {
+        private final long baseOffset;
+        private final int epoch;
+        private final List<T> records;
+
+        public Batch(long baseOffset, int epoch, List<T> records) {
+            this.baseOffset = baseOffset;
+            this.epoch = epoch;
+            this.records = records;
+        }
+
+        public long lastOffset() {
+            return baseOffset + records.size() - 1;
+        }
+
+        public long baseOffset() {
+            return baseOffset;
+        }
+
+        public List<T> records() {
+            return records;
+        }
+
+        public int epoch() {
+            return epoch;
+        }
+
+        @Override
+        public String toString() {
+            return "Batch(" +
+                "baseOffset=" + baseOffset +
+                ", epoch=" + epoch +
+                ", records=" + records +

Review comment:
       We couldn't be sure whether the records could be read as string?

##########
File path: raft/src/main/java/org/apache/kafka/raft/ExpirationService.java
##########
@@ -16,11 +16,17 @@
  */
 package org.apache.kafka.raft;
 
-import org.apache.kafka.common.KafkaException;
+import java.util.concurrent.CompletableFuture;
 
-public class LogTruncationException extends KafkaException {
-
-    public LogTruncationException(String message) {
-        super(message);
-    }
+public interface ExpirationService {
+    /**
+     * Get a new completable future which will automatically expire with a
+     * {@link org.apache.kafka.common.errors.TimeoutException} after the 
provided
+     * timeout passes if it is not completed before then through some other 
means.

Review comment:
       `after the provided timeout passes if it is not completed`/ `if not 
completed within the provided time limit`

##########
File path: 
raft/src/main/java/org/apache/kafka/raft/internals/MemoryBatchReader.java
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft.internals;
+
+import org.apache.kafka.raft.BatchReader;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.OptionalLong;
+
+public class MemoryBatchReader<T> implements BatchReader<T> {
+    private final CloseListener<BatchReader<T>> closeListener;
+    private final Iterator<Batch<T>> iterator;
+    private final long baseOffset;
+    private final long lastOffset;
+
+    public MemoryBatchReader(
+        List<Batch<T>> batches,
+        CloseListener<BatchReader<T>> closeListener
+    ) {
+        if (batches.isEmpty()) {
+            throw new IllegalArgumentException();

Review comment:
       Should we add description to the exception?

##########
File path: 
clients/src/main/java/org/apache/kafka/common/protocol/DataInputStreamReadable.java
##########
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.protocol;
+
+import org.apache.kafka.common.utils.ByteUtils;
+
+import java.io.Closeable;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class DataInputStreamReadable implements Readable, Closeable {
+    protected final DataInputStream input;
+
+    public DataInputStreamReadable(DataInputStream input) {
+        this.input = input;
+    }
+
+    @Override
+    public byte readByte() {
+        try {
+            return input.readByte();
+        } catch (IOException e) {

Review comment:
       Why do we wrap the IO exception here?

##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -228,35 +234,81 @@ private void updateLeaderEndOffsetAndTimestamp(
         final LogOffsetMetadata endOffsetMetadata = log.endOffset();
 
         if (state.updateLocalState(currentTimeMs, endOffsetMetadata)) {
-            updateHighWatermark(state, currentTimeMs);
+            onUpdateLeaderHighWatermark(state, currentTimeMs);
         }
 
-        LogOffset endOffset = new LogOffset(endOffsetMetadata.offset, 
Isolation.UNCOMMITTED);
-        fetchPurgatory.maybeComplete(endOffset, currentTimeMs);
+        fetchPurgatory.maybeComplete(endOffsetMetadata.offset, currentTimeMs);
     }
 
-    private void updateHighWatermark(
-        EpochState state,
+    private void onUpdateLeaderHighWatermark(
+        LeaderState state,
         long currentTimeMs
     ) {
         state.highWatermark().ifPresent(highWatermark -> {
-            logger.debug("High watermark updated to {}", highWatermark);
+            logger.debug("Leader high watermark updated to {}", highWatermark);
             log.updateHighWatermark(highWatermark);
-
-            LogOffset offset = new LogOffset(highWatermark.offset, 
Isolation.COMMITTED);
-            appendPurgatory.maybeComplete(offset, currentTimeMs);
-            fetchPurgatory.maybeComplete(offset, currentTimeMs);
+            appendPurgatory.maybeComplete(highWatermark.offset, currentTimeMs);
+            maybeFireHandleCommit(highWatermark.offset);
         });
     }
 
-    @Override
-    public LeaderAndEpoch currentLeaderAndEpoch() {
-        return quorum.leaderAndEpoch();
+    private void maybeFireHandleCommit(long highWatermark) {
+        maybeFireHandleCommit(listenerContexts, highWatermark);
+    }
+
+    private void maybeFireHandleCommit(List<ListenerContext> listenerContexts, 
long highWatermark) {
+        // TODO: When there are multiple listeners, we can cache reads to save 
some work
+        for (ListenerContext listenerContext : listenerContexts) {
+            OptionalLong nextExpectedOffsetOpt = 
listenerContext.nextExpectedOffset();
+            if (!nextExpectedOffsetOpt.isPresent()) {
+                continue;
+            }
+
+            long nextExpectedOffset = nextExpectedOffsetOpt.getAsLong();
+            if (nextExpectedOffset < highWatermark) {
+                LogFetchInfo readInfo = log.read(nextExpectedOffset, 
Isolation.COMMITTED);
+                listenerContext.fireHandleCommit(nextExpectedOffset, 
readInfo.records);
+            }
+        }
+    }
+
+    private void maybeFireHandleCommit(long baseOffset, int epoch, List<T> 
records) {

Review comment:
       It looks a bit weird to have two versions of fire handle commit, could 
we name them differently or comment about their distinctive logics for 
determining when to fire commit callback?

##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -329,8 +387,9 @@ private void appendLeaderChangeMessage(LeaderState state, 
long currentTimeMs) {
     }
 
     private void flushLeaderLog(LeaderState state, long currentTimeMs) {
-        log.flush();
+        // We update the end offset before flushing so that parked fetches can 
return sooner

Review comment:
       Why would this work? If the flush wasn't successful, could the fetched 
records be invalidated later?

##########
File path: 
raft/src/main/java/org/apache/kafka/raft/internals/RecordsBatchReader.java
##########
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft.internals;
+
+import org.apache.kafka.common.protocol.DataInputStreamReadable;
+import org.apache.kafka.common.protocol.Readable;
+import org.apache.kafka.common.record.BufferSupplier;
+import org.apache.kafka.common.record.DefaultRecordBatch;
+import org.apache.kafka.common.record.FileRecords;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MutableRecordBatch;
+import org.apache.kafka.common.record.Records;
+import org.apache.kafka.raft.BatchReader;
+import org.apache.kafka.raft.RecordSerde;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.OptionalLong;
+
+public class RecordsBatchReader<T> implements BatchReader<T> {

Review comment:
       How do we decide to use either record batch reader or memory batch 
reader?

##########
File path: 
raft/src/main/java/org/apache/kafka/raft/internals/ThresholdPurgatory.java
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft.internals;
+
+import org.apache.kafka.raft.ExpirationService;
+
+import java.util.NavigableMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class ThresholdPurgatory<T extends Comparable<T>> implements 
FuturePurgatory<T> {
+    private final AtomicLong idGenerator = new AtomicLong(0);
+    private final ExpirationService expirationService;
+    private final ConcurrentNavigableMap<ThresholdKey<T>, 
CompletableFuture<Long>> thresholdMap =
+        new ConcurrentSkipListMap<>();
+
+    public ThresholdPurgatory(ExpirationService expirationService) {
+        this.expirationService = expirationService;
+    }
+
+    @Override
+    public CompletableFuture<Long> await(T threshold, long maxWaitTimeMs) {
+        ThresholdKey<T> key = new 
ThresholdKey<>(idGenerator.incrementAndGet(), threshold);
+        CompletableFuture<Long> future = 
expirationService.await(maxWaitTimeMs);
+        thresholdMap.put(key, future);
+        future.whenComplete((timeMs, exception) -> thresholdMap.remove(key));
+        return future;
+    }
+
+    @Override
+    public void maybeComplete(T value, long currentTimeMs) {
+        ThresholdKey<T> maxKey = new ThresholdKey<>(Long.MAX_VALUE, value);
+        NavigableMap<ThresholdKey<T>, CompletableFuture<Long>> submap = 
thresholdMap.headMap(maxKey);
+        for (CompletableFuture<Long> completion : submap.values()) {
+            completion.complete(currentTimeMs);
+        }
+    }
+
+    @Override
+    public void completeAll(long currentTimeMs) {
+        for (CompletableFuture<Long> completion : thresholdMap.values()) {
+            completion.complete(currentTimeMs);
+        }
+    }
+
+    @Override
+    public void completeAllExceptionally(Throwable exception) {
+        for (CompletableFuture<Long> completion : thresholdMap.values()) {
+            completion.completeExceptionally(exception);
+        }
+    }
+
+    @Override
+    public int numWaiting() {
+        return thresholdMap.size();
+    }
+
+    private static class ThresholdKey<T extends Comparable<T>> implements 
Comparable<ThresholdKey<T>> {

Review comment:
       Do we have other types of threshold than Long?

##########
File path: raft/src/main/java/org/apache/kafka/raft/RaftClient.java
##########
@@ -26,30 +24,53 @@
 
     interface Listener<T> {
         /**
-         * Callback which is invoked when records written through {@link 
#scheduleAppend(int, List)}
-         * become committed.
+         * Callback which is invoked for all records committed to the log.
+         * It is the responsibility of the caller to invoke {@link 
BatchReader#close()}
+         * after consuming the reader.
          *
          * Note that there is not a one-to-one correspondence between writes 
through
          * {@link #scheduleAppend(int, List)} and this callback. The Raft 
implementation
          * is free to batch together the records from multiple append calls 
provided
          * that batch boundaries are respected. This means that each batch 
specified
          * through {@link #scheduleAppend(int, List)} is guaranteed to be a 
subset of
-         * a batch passed to {@link #handleCommit(int, long, List)}.
+         * a batch provided by the {@link BatchReader}.
+         *
+         * @param reader reader instance which must be iterated and closed
+         */
+        void handleCommit(BatchReader<T> reader);
+
+        /**
+         * Invoked after this node has become a leader. This is only called 
after
+         * all commits up to the start of the leader's epoch have been sent to
+         * {@link #handleCommit(BatchReader)}.
+         *
+         * After becoming a leader, the client is eligible to write to the log
+         * using {@link #scheduleAppend(int, List)}.
          *
-         * @param epoch the epoch in which the write was accepted
-         * @param lastOffset the offset of the last record in the record list
-         * @param records the set of records that were committed
+         * @param epoch the claimed leader epoch
          */
-        void handleCommit(int epoch, long lastOffset, List<T> records);
+        default void handleClaim(int epoch) {}

Review comment:
       What about just name as `handleBecomeLeader`?

##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -1688,14 +1694,39 @@ private long pollCurrentState(long currentTimeMs) 
throws IOException {
         }
     }
 
+    private void pollListeners() {
+        // Register any listeners added since the last poll
+        while (!pendingListeners.isEmpty()) {
+            Listener<T> listener = pendingListeners.poll();
+            listenerContexts.add(new ListenerContext(listener));
+        }
+
+        // Check listener progress to see if reads are expected
+        quorum.highWatermark().ifPresent(highWatermarkMetadata -> {
+            long highWatermark = highWatermarkMetadata.offset;
+
+            List<ListenerContext> listenersToUpdate = listenerContexts.stream()
+                .filter(listenerContext -> {
+                    OptionalLong nextExpectedOffset = 
listenerContext.nextExpectedOffset();
+                    return nextExpectedOffset.isPresent() && 
nextExpectedOffset.getAsLong() < highWatermark;
+                })
+                .collect(Collectors.toList());
+
+            maybeFireHandleCommit(listenersToUpdate, 
highWatermarkMetadata.offset);
+        });
+    }
+
     public void poll() throws IOException {
         GracefulShutdown gracefulShutdown = shutdown.get();
         if (gracefulShutdown != null) {
             pollShutdown(gracefulShutdown);
         } else {
+            pollListeners();

Review comment:
       Should we call `pollListeners` after `pollCurrentState` to get more 
recent updates quicker?

##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -1688,14 +1694,39 @@ private long pollCurrentState(long currentTimeMs) 
throws IOException {
         }
     }
 
+    private void pollListeners() {
+        // Register any listeners added since the last poll
+        while (!pendingListeners.isEmpty()) {

Review comment:
       Do we anticipate use cases to add listeners on the fly? Right now I 
could only see one case in static context from test raft server.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to