hachikuji commented on a change in pull request #9482: URL: https://github.com/apache/kafka/pull/9482#discussion_r510433916
########## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ########## @@ -1757,35 +1809,86 @@ public void complete() { } } - private static class UnwrittenAppend { - private final Records records; - private final long createTimeMs; - private final long requestTimeoutMs; - private final AckMode ackMode; - private final CompletableFuture<OffsetAndEpoch> future; + private final class ListenerContext implements CloseListener<BatchReader<T>> { + private final RaftClient.Listener<T> listener; + private BatchReader<T> lastSent = null; + private long lastAckedOffset = 0; + private int claimedEpoch = 0; + + private ListenerContext(Listener<T> listener) { + this.listener = listener; + } + + /** + * Get the last acked offset, which is one greater than the offset of the + * last record which was acked by the state machine. + */ + public synchronized long lastAckedOffset() { + return lastAckedOffset; + } + + /** + * Get the next expected offset, which might be larger than the last acked + * offset if there are inflight batches which have not been acked yet. + * Note that when fetching from disk, we may not know the last offset of + * inflight data until it has been processed by the state machine. In this case, + * we delay sending additional data until the state machine has read to the + * end and the last offset is determined. Review comment: When catching up from the log, yes. However, I have implemented an optimization for writes from the leader. We save the original batch in memory so that it can be sent back to the state machine after the write is committed. In this case, we know the last offset of the batch, so we can have multiple inflight batches sent to the controller. This is nice because it means the elected controller will not have to read from disk. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org