[GitHub] [kafka] hachikuji commented on a change in pull request #9482: KAFKA-10632; Raft client should push all committed data to state machines

2020-11-03 Thread GitBox


hachikuji commented on a change in pull request #9482:
URL: https://github.com/apache/kafka/pull/9482#discussion_r516151645



##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -1017,12 +1028,9 @@ private boolean handleFetchResponse(
 
log.truncateToEndOffset(divergingOffsetAndEpoch).ifPresent(truncationOffset -> {
 logger.info("Truncated to offset {} from Fetch response 
from leader {}",
 truncationOffset, quorum.leaderIdOrNil());
-
-// Since the end offset has been updated, we should 
complete any delayed
-// reads at the end offset.
-fetchPurgatory.maybeComplete(
-new LogOffset(Long.MAX_VALUE, Isolation.UNCOMMITTED),
-currentTimeMs);
+// After truncation, we complete all pending reads in 
order to
+// ensure that fetches account for the updated log end 
offset
+fetchPurgatory.completeAll(currentTimeMs);

Review comment:
   > With the new Listener when is this not a noop? Looking at the code, we 
only add entries to fetchPurgatorywhen the replica is a leader and it receives 
a Fetch request.
   
   Yeah, that's fair. I don't think we can truncate unless we are a follower 
and that implies we already cleared the purgatory in `onBecomeFollower`. So I 
think you are right that we are safe to remove this, though we'll probably need 
to add it back once we have follower fetching. 
   
   > I think the part that is missing is that the old leader should 
fetchPurgatory.completeAll when it loses leadership.
   
   I had considered this previously and decided to leave the fetches in 
purgatory while the election was in progress to prevent unnecessary retries 
since that is all the client can do while waiting for the outcome. On the other 
hand, some of the fetches in purgatory might be from other voters. It might be 
better to respond more quickly so that there are not any unnecessary election 
delays. I'd suggest we open a separate issue to consider this.

##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -228,35 +234,80 @@ private void updateLeaderEndOffsetAndTimestamp(
 final LogOffsetMetadata endOffsetMetadata = log.endOffset();
 
 if (state.updateLocalState(currentTimeMs, endOffsetMetadata)) {
-updateHighWatermark(state, currentTimeMs);
+onUpdateLeaderHighWatermark(state, currentTimeMs);
 }
 
-LogOffset endOffset = new LogOffset(endOffsetMetadata.offset, 
Isolation.UNCOMMITTED);
-fetchPurgatory.maybeComplete(endOffset, currentTimeMs);
+fetchPurgatory.maybeComplete(endOffsetMetadata.offset, currentTimeMs);
 }
 
-private void updateHighWatermark(
-EpochState state,
+private void onUpdateLeaderHighWatermark(
+LeaderState state,
 long currentTimeMs
 ) {
 state.highWatermark().ifPresent(highWatermark -> {
-logger.debug("High watermark updated to {}", highWatermark);
+logger.debug("Leader high watermark updated to {}", highWatermark);
 log.updateHighWatermark(highWatermark);
-
-LogOffset offset = new LogOffset(highWatermark.offset, 
Isolation.COMMITTED);
-appendPurgatory.maybeComplete(offset, currentTimeMs);
-fetchPurgatory.maybeComplete(offset, currentTimeMs);
+appendPurgatory.maybeComplete(highWatermark.offset, currentTimeMs);
+maybeFireHandleCommit(highWatermark.offset);

Review comment:
   I will add a comment. I agree it is a subtle point.

##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -1778,4 +1808,98 @@ public void complete() {
 }
 }
 
+private final class ListenerContext implements 
CloseListener> {
+private final RaftClient.Listener listener;
+private BatchReader lastSent = null;
+private long lastAckedOffset = 0;
+private int claimedEpoch = 0;

Review comment:
   Let me add a helper to `ListenerContext` so that we can keep the field 
encapsulated.

##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -1778,4 +1808,98 @@ public void complete() {
 }
 }
 
+private final class ListenerContext implements 
CloseListener> {
+private final RaftClient.Listener listener;
+private BatchReader lastSent = null;
+private long lastAckedOffset = 0;
+private int claimedEpoch = 0;
+
+private ListenerContext(Listener listener) {
+this.listener = listener;
+}
+
+/**
+ * Get the last acked offset, which is one greater than the offset of 
the
+ * last record which was acked by the state machine.
+ */
+public synchronized long 

[GitHub] [kafka] hachikuji commented on a change in pull request #9482: KAFKA-10632; Raft client should push all committed data to state machines

2020-10-29 Thread GitBox


hachikuji commented on a change in pull request #9482:
URL: https://github.com/apache/kafka/pull/9482#discussion_r514585353



##
File path: raft/src/main/java/org/apache/kafka/raft/RaftClient.java
##
@@ -26,30 +24,53 @@
 
 interface Listener {
 /**
- * Callback which is invoked when records written through {@link 
#scheduleAppend(int, List)}
- * become committed.
+ * Callback which is invoked for all records committed to the log.
+ * It is the responsibility of the caller to invoke {@link 
BatchReader#close()}
+ * after consuming the reader.
  *
  * Note that there is not a one-to-one correspondence between writes 
through
  * {@link #scheduleAppend(int, List)} and this callback. The Raft 
implementation
  * is free to batch together the records from multiple append calls 
provided
  * that batch boundaries are respected. This means that each batch 
specified
  * through {@link #scheduleAppend(int, List)} is guaranteed to be a 
subset of
- * a batch passed to {@link #handleCommit(int, long, List)}.
+ * a batch provided by the {@link BatchReader}.
+ *
+ * @param reader reader instance which must be iterated and closed
+ */
+void handleCommit(BatchReader reader);
+
+/**
+ * Invoked after this node has become a leader. This is only called 
after
+ * all commits up to the start of the leader's epoch have been sent to
+ * {@link #handleCommit(BatchReader)}.
+ *
+ * After becoming a leader, the client is eligible to write to the log
+ * using {@link #scheduleAppend(int, List)}.
  *
- * @param epoch the epoch in which the write was accepted
- * @param lastOffset the offset of the last record in the record list
- * @param records the set of records that were committed
+ * @param epoch the claimed leader epoch
  */
-void handleCommit(int epoch, long lastOffset, List records);
+default void handleClaim(int epoch) {}

Review comment:
   Yeah, I considered using `handleBecomeLeader` and `handleResignLeader`. 
In the end, I decided to use the more concise `handleClaim` and `handleResign` 
names which are used in the kip-500 branch. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9482: KAFKA-10632; Raft client should push all committed data to state machines

2020-10-29 Thread GitBox


hachikuji commented on a change in pull request #9482:
URL: https://github.com/apache/kafka/pull/9482#discussion_r514585353



##
File path: raft/src/main/java/org/apache/kafka/raft/RaftClient.java
##
@@ -26,30 +24,53 @@
 
 interface Listener {
 /**
- * Callback which is invoked when records written through {@link 
#scheduleAppend(int, List)}
- * become committed.
+ * Callback which is invoked for all records committed to the log.
+ * It is the responsibility of the caller to invoke {@link 
BatchReader#close()}
+ * after consuming the reader.
  *
  * Note that there is not a one-to-one correspondence between writes 
through
  * {@link #scheduleAppend(int, List)} and this callback. The Raft 
implementation
  * is free to batch together the records from multiple append calls 
provided
  * that batch boundaries are respected. This means that each batch 
specified
  * through {@link #scheduleAppend(int, List)} is guaranteed to be a 
subset of
- * a batch passed to {@link #handleCommit(int, long, List)}.
+ * a batch provided by the {@link BatchReader}.
+ *
+ * @param reader reader instance which must be iterated and closed
+ */
+void handleCommit(BatchReader reader);
+
+/**
+ * Invoked after this node has become a leader. This is only called 
after
+ * all commits up to the start of the leader's epoch have been sent to
+ * {@link #handleCommit(BatchReader)}.
+ *
+ * After becoming a leader, the client is eligible to write to the log
+ * using {@link #scheduleAppend(int, List)}.
  *
- * @param epoch the epoch in which the write was accepted
- * @param lastOffset the offset of the last record in the record list
- * @param records the set of records that were committed
+ * @param epoch the claimed leader epoch
  */
-void handleCommit(int epoch, long lastOffset, List records);
+default void handleClaim(int epoch) {}

Review comment:
   Yeah, I considered using `handleBecomeLeader` and `handleResignLeader`. 
In the end, I decided to use the more concise `handleClaim` and `handleResign` 
which are used in the kip-500 branch. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9482: KAFKA-10632; Raft client should push all committed data to state machines

2020-10-29 Thread GitBox


hachikuji commented on a change in pull request #9482:
URL: https://github.com/apache/kafka/pull/9482#discussion_r514584469



##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -1688,14 +1694,39 @@ private long pollCurrentState(long currentTimeMs) 
throws IOException {
 }
 }
 
+private void pollListeners() {
+// Register any listeners added since the last poll
+while (!pendingListeners.isEmpty()) {
+Listener listener = pendingListeners.poll();
+listenerContexts.add(new ListenerContext(listener));
+}
+
+// Check listener progress to see if reads are expected
+quorum.highWatermark().ifPresent(highWatermarkMetadata -> {
+long highWatermark = highWatermarkMetadata.offset;
+
+List listenersToUpdate = listenerContexts.stream()
+.filter(listenerContext -> {
+OptionalLong nextExpectedOffset = 
listenerContext.nextExpectedOffset();
+return nextExpectedOffset.isPresent() && 
nextExpectedOffset.getAsLong() < highWatermark;
+})
+.collect(Collectors.toList());
+
+maybeFireHandleCommit(listenersToUpdate, 
highWatermarkMetadata.offset);
+});
+}
+
 public void poll() throws IOException {
 GracefulShutdown gracefulShutdown = shutdown.get();
 if (gracefulShutdown != null) {
 pollShutdown(gracefulShutdown);
 } else {
+pollListeners();

Review comment:
   Hmm, that's a fair question. I think the listeners will tend to get new 
data in two cases: 1) high watermark advanced, or 2) a previous read completes. 
In the first case, the high watermark only advances in response to a request, 
so there should be no delay. In the second case, we call `wakeup()` to take us 
out of the network poll, so I think there also should be no delay. Can you 
think of a case where there would be a delay?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9482: KAFKA-10632; Raft client should push all committed data to state machines

2020-10-29 Thread GitBox


hachikuji commented on a change in pull request #9482:
URL: https://github.com/apache/kafka/pull/9482#discussion_r514539737



##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -329,8 +387,9 @@ private void appendLeaderChangeMessage(LeaderState state, 
long currentTimeMs) {
 }
 
 private void flushLeaderLog(LeaderState state, long currentTimeMs) {
-log.flush();
+// We update the end offset before flushing so that parked fetches can 
return sooner

Review comment:
   Yeah, it's ok for followers to see uncommitted or even unflushed data. 
The main thing is that we avoid advancing the high watermark until the fsync 
completes. Note that this is the main reason that we had to do KAFKA-10527. 
Without this fix, it was possible for the leader to continue in the same epoch 
after a start, which means that it could lose and overwrite unflushed data.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9482: KAFKA-10632; Raft client should push all committed data to state machines

2020-10-29 Thread GitBox


hachikuji commented on a change in pull request #9482:
URL: https://github.com/apache/kafka/pull/9482#discussion_r514537694



##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -228,35 +234,81 @@ private void updateLeaderEndOffsetAndTimestamp(
 final LogOffsetMetadata endOffsetMetadata = log.endOffset();
 
 if (state.updateLocalState(currentTimeMs, endOffsetMetadata)) {
-updateHighWatermark(state, currentTimeMs);
+onUpdateLeaderHighWatermark(state, currentTimeMs);
 }
 
-LogOffset endOffset = new LogOffset(endOffsetMetadata.offset, 
Isolation.UNCOMMITTED);
-fetchPurgatory.maybeComplete(endOffset, currentTimeMs);
+fetchPurgatory.maybeComplete(endOffsetMetadata.offset, currentTimeMs);
 }
 
-private void updateHighWatermark(
-EpochState state,
+private void onUpdateLeaderHighWatermark(
+LeaderState state,
 long currentTimeMs
 ) {
 state.highWatermark().ifPresent(highWatermark -> {
-logger.debug("High watermark updated to {}", highWatermark);
+logger.debug("Leader high watermark updated to {}", highWatermark);
 log.updateHighWatermark(highWatermark);
-
-LogOffset offset = new LogOffset(highWatermark.offset, 
Isolation.COMMITTED);
-appendPurgatory.maybeComplete(offset, currentTimeMs);
-fetchPurgatory.maybeComplete(offset, currentTimeMs);
+appendPurgatory.maybeComplete(highWatermark.offset, currentTimeMs);
+maybeFireHandleCommit(highWatermark.offset);
 });
 }
 
-@Override
-public LeaderAndEpoch currentLeaderAndEpoch() {
-return quorum.leaderAndEpoch();
+private void maybeFireHandleCommit(long highWatermark) {
+maybeFireHandleCommit(listenerContexts, highWatermark);
+}
+
+private void maybeFireHandleCommit(List listenerContexts, 
long highWatermark) {
+// TODO: When there are multiple listeners, we can cache reads to save 
some work
+for (ListenerContext listenerContext : listenerContexts) {
+OptionalLong nextExpectedOffsetOpt = 
listenerContext.nextExpectedOffset();
+if (!nextExpectedOffsetOpt.isPresent()) {
+continue;
+}
+
+long nextExpectedOffset = nextExpectedOffsetOpt.getAsLong();
+if (nextExpectedOffset < highWatermark) {
+LogFetchInfo readInfo = log.read(nextExpectedOffset, 
Isolation.COMMITTED);
+listenerContext.fireHandleCommit(nextExpectedOffset, 
readInfo.records);
+}
+}
+}
+
+private void maybeFireHandleCommit(long baseOffset, int epoch, List 
records) {

Review comment:
   The only difference is the input. I will add some comments to try and 
clarify the usage.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9482: KAFKA-10632; Raft client should push all committed data to state machines

2020-10-29 Thread GitBox


hachikuji commented on a change in pull request #9482:
URL: https://github.com/apache/kafka/pull/9482#discussion_r514536828



##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -228,35 +234,81 @@ private void updateLeaderEndOffsetAndTimestamp(
 final LogOffsetMetadata endOffsetMetadata = log.endOffset();
 
 if (state.updateLocalState(currentTimeMs, endOffsetMetadata)) {
-updateHighWatermark(state, currentTimeMs);
+onUpdateLeaderHighWatermark(state, currentTimeMs);
 }
 
-LogOffset endOffset = new LogOffset(endOffsetMetadata.offset, 
Isolation.UNCOMMITTED);
-fetchPurgatory.maybeComplete(endOffset, currentTimeMs);
+fetchPurgatory.maybeComplete(endOffsetMetadata.offset, currentTimeMs);
 }
 
-private void updateHighWatermark(
-EpochState state,
+private void onUpdateLeaderHighWatermark(
+LeaderState state,
 long currentTimeMs
 ) {
 state.highWatermark().ifPresent(highWatermark -> {
-logger.debug("High watermark updated to {}", highWatermark);
+logger.debug("Leader high watermark updated to {}", highWatermark);
 log.updateHighWatermark(highWatermark);
-
-LogOffset offset = new LogOffset(highWatermark.offset, 
Isolation.COMMITTED);
-appendPurgatory.maybeComplete(offset, currentTimeMs);
-fetchPurgatory.maybeComplete(offset, currentTimeMs);
+appendPurgatory.maybeComplete(highWatermark.offset, currentTimeMs);
+maybeFireHandleCommit(highWatermark.offset);
 });
 }
 
-@Override
-public LeaderAndEpoch currentLeaderAndEpoch() {
-return quorum.leaderAndEpoch();
+private void maybeFireHandleCommit(long highWatermark) {
+maybeFireHandleCommit(listenerContexts, highWatermark);
+}
+
+private void maybeFireHandleCommit(List listenerContexts, 
long highWatermark) {
+// TODO: When there are multiple listeners, we can cache reads to save 
some work
+for (ListenerContext listenerContext : listenerContexts) {
+OptionalLong nextExpectedOffsetOpt = 
listenerContext.nextExpectedOffset();
+if (!nextExpectedOffsetOpt.isPresent()) {
+continue;
+}
+
+long nextExpectedOffset = nextExpectedOffsetOpt.getAsLong();
+if (nextExpectedOffset < highWatermark) {
+LogFetchInfo readInfo = log.read(nextExpectedOffset, 
Isolation.COMMITTED);
+listenerContext.fireHandleCommit(nextExpectedOffset, 
readInfo.records);
+}
+}
+}
+
+private void maybeFireHandleCommit(long baseOffset, int epoch, List 
records) {
+for (ListenerContext listenerContext : listenerContexts) {
+OptionalLong nextExpectedOffsetOpt = 
listenerContext.nextExpectedOffset();
+if (!nextExpectedOffsetOpt.isPresent()) {
+continue;
+}
+
+long nextExpectedOffset = nextExpectedOffsetOpt.getAsLong();
+if (nextExpectedOffset == baseOffset) {
+listenerContext.fireHandleCommit(baseOffset, epoch, records);
+}
+}
+}
+
+private void maybeFireHandleClaim(LeaderState state) {
+for (ListenerContext listenerContext : listenerContexts) {
+int leaderEpoch = state.epoch();

Review comment:
   I will move this outside the loop.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9482: KAFKA-10632; Raft client should push all committed data to state machines

2020-10-29 Thread GitBox


hachikuji commented on a change in pull request #9482:
URL: https://github.com/apache/kafka/pull/9482#discussion_r514535761



##
File path: 
raft/src/main/java/org/apache/kafka/raft/internals/ThresholdPurgatory.java
##
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft.internals;
+
+import org.apache.kafka.raft.ExpirationService;
+
+import java.util.NavigableMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class ThresholdPurgatory> implements 
FuturePurgatory {
+private final AtomicLong idGenerator = new AtomicLong(0);
+private final ExpirationService expirationService;
+private final ConcurrentNavigableMap, 
CompletableFuture> thresholdMap =
+new ConcurrentSkipListMap<>();
+
+public ThresholdPurgatory(ExpirationService expirationService) {
+this.expirationService = expirationService;
+}
+
+@Override
+public CompletableFuture await(T threshold, long maxWaitTimeMs) {
+ThresholdKey key = new 
ThresholdKey<>(idGenerator.incrementAndGet(), threshold);
+CompletableFuture future = 
expirationService.await(maxWaitTimeMs);
+thresholdMap.put(key, future);
+future.whenComplete((timeMs, exception) -> thresholdMap.remove(key));
+return future;
+}
+
+@Override
+public void maybeComplete(T value, long currentTimeMs) {
+ThresholdKey maxKey = new ThresholdKey<>(Long.MAX_VALUE, value);
+NavigableMap, CompletableFuture> submap = 
thresholdMap.headMap(maxKey);
+for (CompletableFuture completion : submap.values()) {
+completion.complete(currentTimeMs);
+}
+}
+
+@Override
+public void completeAll(long currentTimeMs) {
+for (CompletableFuture completion : thresholdMap.values()) {
+completion.complete(currentTimeMs);
+}
+}
+
+@Override
+public void completeAllExceptionally(Throwable exception) {
+for (CompletableFuture completion : thresholdMap.values()) {
+completion.completeExceptionally(exception);
+}
+}
+
+@Override
+public int numWaiting() {
+return thresholdMap.size();
+}
+
+private static class ThresholdKey> implements 
Comparable> {

Review comment:
   Not at the moment. I guess your point is that we might be able to drop 
the generics, which is fair. I think we can also drop the `FuturePurgatory` 
interface. Is it ok if we save this for a follow-up?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9482: KAFKA-10632; Raft client should push all committed data to state machines

2020-10-29 Thread GitBox


hachikuji commented on a change in pull request #9482:
URL: https://github.com/apache/kafka/pull/9482#discussion_r514533712



##
File path: 
raft/src/main/java/org/apache/kafka/raft/internals/RecordsBatchReader.java
##
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft.internals;
+
+import org.apache.kafka.common.protocol.DataInputStreamReadable;
+import org.apache.kafka.common.protocol.Readable;
+import org.apache.kafka.common.record.BufferSupplier;
+import org.apache.kafka.common.record.DefaultRecordBatch;
+import org.apache.kafka.common.record.FileRecords;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MutableRecordBatch;
+import org.apache.kafka.common.record.Records;
+import org.apache.kafka.raft.BatchReader;
+import org.apache.kafka.raft.RecordSerde;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.OptionalLong;
+
+public class RecordsBatchReader implements BatchReader {

Review comment:
   `MemoryBatchReader` is only used for writes from the leader. We retain 
the original records from the call to `scheduleAppend` and send them to the 
listener in `handleCommit`. This is useful because it ensures the active 
controller will not need to read from disk.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9482: KAFKA-10632; Raft client should push all committed data to state machines

2020-10-29 Thread GitBox


hachikuji commented on a change in pull request #9482:
URL: https://github.com/apache/kafka/pull/9482#discussion_r514518360



##
File path: raft/src/main/java/org/apache/kafka/raft/BatchReader.java
##
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft;
+
+import java.io.Closeable;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.OptionalLong;
+
+/**
+ * This interface is used to send committed data from the {@link RaftClient}
+ * down to registered {@link RaftClient.Listener} instances.
+ *
+ * The advantage of hiding the consumption of committed batches behind an 
interface
+ * is that it allows us to push blocking operations such as reads from disk 
outside
+ * of the Raft IO thread. This helps to ensure that a slow state machine will 
not
+ * affect replication.
+ *
+ * @param  record type (see {@link org.apache.kafka.raft.RecordSerde})
+ */
+public interface BatchReader extends Iterator>, 
Closeable {
+
+/**
+ * Get the base offset of the readable batches. Note that this value is a 
constant
+ * which is defined when the {@link BatchReader} instance is constructed. 
It does
+ * not change based on reader progress.
+ *
+ * @return the base offset
+ */
+long baseOffset();
+
+/**
+ * Get the last offset of the batch if it is known. When reading from 
disk, we may
+ * not know the last offset of a set of records until it has been read 
from disk.
+ * In this case, the state machine cannot advance to the next committed 
data until
+ * all batches from the {@link BatchReader} instance have been consumed.
+ *
+ * @return optional last offset
+ */
+OptionalLong lastOffset();
+
+/**
+ * Close this reader. It is the responsibility of the {@link 
RaftClient.Listener}
+ * to close each reader passed to {@link 
RaftClient.Listener#handleCommit(BatchReader)}.
+ */
+@Override
+void close();
+
+class Batch {
+private final long baseOffset;
+private final int epoch;
+private final List records;
+
+public Batch(long baseOffset, int epoch, List records) {
+this.baseOffset = baseOffset;
+this.epoch = epoch;
+this.records = records;
+}
+
+public long lastOffset() {
+return baseOffset + records.size() - 1;
+}
+
+public long baseOffset() {
+return baseOffset;
+}
+
+public List records() {
+return records;
+}
+
+public int epoch() {
+return epoch;
+}
+
+@Override
+public String toString() {
+return "Batch(" +
+"baseOffset=" + baseOffset +
+", epoch=" + epoch +
+", records=" + records +
+')';
+}
+
+@Override
+public boolean equals(Object o) {
+if (this == o) return true;
+if (o == null || getClass() != o.getClass()) return false;
+Batch batch = (Batch) o;
+return baseOffset == batch.baseOffset &&
+epoch == batch.epoch &&
+Objects.equals(records, batch.records);

Review comment:
   We are relying on the standard `equals` method. I think it's up to the 
user of the api to ensure a reasonable implementation if they expect to rely on 
batch equality. The raft implementation does not set any expectations on record 
equality, but it is useful in testing where we can control the record type.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9482: KAFKA-10632; Raft client should push all committed data to state machines

2020-10-29 Thread GitBox


hachikuji commented on a change in pull request #9482:
URL: https://github.com/apache/kafka/pull/9482#discussion_r514516570



##
File path: raft/src/main/java/org/apache/kafka/raft/BatchReader.java
##
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft;
+
+import java.io.Closeable;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.OptionalLong;
+
+/**
+ * This interface is used to send committed data from the {@link RaftClient}
+ * down to registered {@link RaftClient.Listener} instances.
+ *
+ * The advantage of hiding the consumption of committed batches behind an 
interface
+ * is that it allows us to push blocking operations such as reads from disk 
outside
+ * of the Raft IO thread. This helps to ensure that a slow state machine will 
not
+ * affect replication.
+ *
+ * @param  record type (see {@link org.apache.kafka.raft.RecordSerde})
+ */
+public interface BatchReader extends Iterator>, 
Closeable {
+
+/**
+ * Get the base offset of the readable batches. Note that this value is a 
constant
+ * which is defined when the {@link BatchReader} instance is constructed. 
It does
+ * not change based on reader progress.
+ *
+ * @return the base offset
+ */
+long baseOffset();
+
+/**
+ * Get the last offset of the batch if it is known. When reading from 
disk, we may
+ * not know the last offset of a set of records until it has been read 
from disk.
+ * In this case, the state machine cannot advance to the next committed 
data until
+ * all batches from the {@link BatchReader} instance have been consumed.
+ *
+ * @return optional last offset
+ */
+OptionalLong lastOffset();
+
+/**
+ * Close this reader. It is the responsibility of the {@link 
RaftClient.Listener}
+ * to close each reader passed to {@link 
RaftClient.Listener#handleCommit(BatchReader)}.
+ */
+@Override
+void close();
+
+class Batch {
+private final long baseOffset;
+private final int epoch;
+private final List records;
+
+public Batch(long baseOffset, int epoch, List records) {
+this.baseOffset = baseOffset;
+this.epoch = epoch;
+this.records = records;
+}
+
+public long lastOffset() {
+return baseOffset + records.size() - 1;
+}
+
+public long baseOffset() {
+return baseOffset;
+}
+
+public List records() {
+return records;
+}
+
+public int epoch() {
+return epoch;
+}
+
+@Override
+public String toString() {
+return "Batch(" +
+"baseOffset=" + baseOffset +
+", epoch=" + epoch +
+", records=" + records +

Review comment:
   Yeah, I'm relying on the `toString`. I think this is only useful for 
debugging.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9482: KAFKA-10632; Raft client should push all committed data to state machines

2020-10-29 Thread GitBox


hachikuji commented on a change in pull request #9482:
URL: https://github.com/apache/kafka/pull/9482#discussion_r514515763



##
File path: 
clients/src/main/java/org/apache/kafka/common/protocol/DataInputStreamReadable.java
##
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.protocol;
+
+import org.apache.kafka.common.utils.ByteUtils;
+
+import java.io.Closeable;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class DataInputStreamReadable implements Readable, Closeable {
+protected final DataInputStream input;
+
+public DataInputStreamReadable(DataInputStream input) {
+this.input = input;
+}
+
+@Override
+public byte readByte() {
+try {
+return input.readByte();
+} catch (IOException e) {

Review comment:
   `IOException` is checked, so we cannot raise it from the current 
`Readable` interface, so the options are to either add the exception to the 
`Readable` interface or to rethrow it as an unchecked exception. I went with 
the latter to reduce the impact and because I think we tend to prefer unchecked 
exceptions in general since checked exceptions sort of end up leaking their way 
through a bunch of call stacks.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9482: KAFKA-10632; Raft client should push all committed data to state machines

2020-10-29 Thread GitBox


hachikuji commented on a change in pull request #9482:
URL: https://github.com/apache/kafka/pull/9482#discussion_r514514051



##
File path: 
clients/src/main/java/org/apache/kafka/common/protocol/DataInputStreamReadable.java
##
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.protocol;
+
+import org.apache.kafka.common.utils.ByteUtils;
+
+import java.io.Closeable;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class DataInputStreamReadable implements Readable, Closeable {
+protected final DataInputStream input;
+
+public DataInputStreamReadable(DataInputStream input) {
+this.input = input;
+}
+
+@Override
+public byte readByte() {
+try {
+return input.readByte();
+} catch (IOException e) {
+throw new RuntimeException(e);
+}
+}
+
+@Override
+public short readShort() {
+try {

Review comment:
   Yeah, I could introduce a helper with a lambda, but that would add some 
unnecessary garbage to the deserialization path. Although it is ugly, I think 
the duplication is not a big deal. We probably won't touch this class after it 
is created.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9482: KAFKA-10632; Raft client should push all committed data to state machines

2020-10-29 Thread GitBox


hachikuji commented on a change in pull request #9482:
URL: https://github.com/apache/kafka/pull/9482#discussion_r514512213



##
File path: 
clients/src/main/java/org/apache/kafka/common/metrics/stats/Percentiles.java
##
@@ -113,11 +113,11 @@ protected HistogramSample newSample(long timeMs) {
 protected void update(Sample sample, MetricConfig config, double value, 
long timeMs) {
 final double boundedValue;
 if (value > max) {
-log.warn("Received value {} which is greater than max recordable 
value {}, will be pinned to the max value",
+log.debug("Received value {} which is greater than max recordable 
value {}, will be pinned to the max value",

Review comment:
   The `warn` seemed excessive for a metric update, which could be done 
very frequently. I looked over the code and it looks like we don't really use 
this outside of tests (and now the code added in this patch). I think the user 
should just understand the contract, which is that anything outside of the 
specified range gets rounded down.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9482: KAFKA-10632; Raft client should push all committed data to state machines

2020-10-29 Thread GitBox


hachikuji commented on a change in pull request #9482:
URL: https://github.com/apache/kafka/pull/9482#discussion_r514504593



##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -1688,14 +1694,39 @@ private long pollCurrentState(long currentTimeMs) 
throws IOException {
 }
 }
 
+private void pollListeners() {
+// Register any listeners added since the last poll
+while (!pendingListeners.isEmpty()) {

Review comment:
   I doubt we would use it in practice, though I guess it would open the 
door to changing roles dynamically, which might be interesting in the future. 
That said, it was simple to add and useful in testing since it gave me an easy 
way to initialize a state where a listener had not caught up.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9482: KAFKA-10632; Raft client should push all committed data to state machines

2020-10-22 Thread GitBox


hachikuji commented on a change in pull request #9482:
URL: https://github.com/apache/kafka/pull/9482#discussion_r510513954



##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -206,30 +234,77 @@ private void updateLeaderEndOffsetAndTimestamp(
 final LogOffsetMetadata endOffsetMetadata = log.endOffset();
 
 if (state.updateLocalState(currentTimeMs, endOffsetMetadata)) {
-updateHighWatermark(state, currentTimeMs);
+onUpdateLeaderHighWatermark(state, currentTimeMs);
 }
 
-LogOffset endOffset = new LogOffset(endOffsetMetadata.offset, 
Isolation.UNCOMMITTED);
-fetchPurgatory.maybeComplete(endOffset, currentTimeMs);
+fetchPurgatory.maybeComplete(endOffsetMetadata.offset, currentTimeMs);
 }
 
-private void updateHighWatermark(
-EpochState state,
+private void onUpdateLeaderHighWatermark(
+LeaderState state,
 long currentTimeMs
 ) {
 state.highWatermark().ifPresent(highWatermark -> {
-logger.debug("High watermark updated to {}", highWatermark);
+logger.debug("Leader high watermark updated to {}", highWatermark);
 log.updateHighWatermark(highWatermark);
-
-LogOffset offset = new LogOffset(highWatermark.offset, 
Isolation.COMMITTED);
-appendPurgatory.maybeComplete(offset, currentTimeMs);
-fetchPurgatory.maybeComplete(offset, currentTimeMs);
+appendPurgatory.maybeComplete(highWatermark.offset, currentTimeMs);
+maybeFireHandleCommit(highWatermark.offset);
 });
 }
 
-@Override
-public LeaderAndEpoch currentLeaderAndEpoch() {
-return quorum.leaderAndEpoch();
+private void maybeFireHandleCommit(long highWatermark) {
+maybeFireHandleCommit(listenerContexts, highWatermark);
+}
+
+private void maybeFireHandleCommit(List listenerContexts, 
long highWatermark) {
+// TODO: When there are multiple listeners, we can cache reads to save 
some work
+for (ListenerContext listenerContext : listenerContexts) {
+OptionalLong nextExpectedOffsetOpt = 
listenerContext.nextExpectedOffset();
+if (!nextExpectedOffsetOpt.isPresent()) {
+return;
+}
+
+long nextExpectedOffset = nextExpectedOffsetOpt.getAsLong();
+if (nextExpectedOffset < highWatermark) {
+LogFetchInfo readInfo = log.read(nextExpectedOffset, 
Isolation.COMMITTED);
+listenerContext.fireHandleCommit(nextExpectedOffset, 
readInfo.records);
+}
+}
+}
+
+private void maybeFireHandleCommit(long baseOffset, int epoch, List 
records) {
+for (ListenerContext listenerContext : listenerContexts) {
+OptionalLong nextExpectedOffsetOpt = 
listenerContext.nextExpectedOffset();
+if (!nextExpectedOffsetOpt.isPresent()) {
+return;
+}
+
+long nextExpectedOffset = nextExpectedOffsetOpt.getAsLong();
+if (nextExpectedOffset == baseOffset) {
+listenerContext.fireHandleCommit(baseOffset, epoch, records);
+}
+}
+}
+
+private void maybeFireHandleClaim(LeaderState state) {
+for (ListenerContext listenerContext : listenerContexts) {
+int leaderEpoch = state.epoch();
+
+// We can fire `handleClaim` as soon as the listener has caught
+// up to the start of the leader epoch. This guarantees that the
+// state machine has seen the full committed state before it 
becomes
+// leader and begins writing to the log.

Review comment:
   I guess let's keep this option in our back pocket for now.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9482: KAFKA-10632; Raft client should push all committed data to state machines

2020-10-22 Thread GitBox


hachikuji commented on a change in pull request #9482:
URL: https://github.com/apache/kafka/pull/9482#discussion_r510455713



##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -206,30 +234,77 @@ private void updateLeaderEndOffsetAndTimestamp(
 final LogOffsetMetadata endOffsetMetadata = log.endOffset();
 
 if (state.updateLocalState(currentTimeMs, endOffsetMetadata)) {
-updateHighWatermark(state, currentTimeMs);
+onUpdateLeaderHighWatermark(state, currentTimeMs);
 }
 
-LogOffset endOffset = new LogOffset(endOffsetMetadata.offset, 
Isolation.UNCOMMITTED);
-fetchPurgatory.maybeComplete(endOffset, currentTimeMs);
+fetchPurgatory.maybeComplete(endOffsetMetadata.offset, currentTimeMs);
 }
 
-private void updateHighWatermark(
-EpochState state,
+private void onUpdateLeaderHighWatermark(
+LeaderState state,
 long currentTimeMs
 ) {
 state.highWatermark().ifPresent(highWatermark -> {
-logger.debug("High watermark updated to {}", highWatermark);
+logger.debug("Leader high watermark updated to {}", highWatermark);
 log.updateHighWatermark(highWatermark);
-
-LogOffset offset = new LogOffset(highWatermark.offset, 
Isolation.COMMITTED);
-appendPurgatory.maybeComplete(offset, currentTimeMs);
-fetchPurgatory.maybeComplete(offset, currentTimeMs);
+appendPurgatory.maybeComplete(highWatermark.offset, currentTimeMs);
+maybeFireHandleCommit(highWatermark.offset);
 });
 }
 
-@Override
-public LeaderAndEpoch currentLeaderAndEpoch() {
-return quorum.leaderAndEpoch();
+private void maybeFireHandleCommit(long highWatermark) {
+maybeFireHandleCommit(listenerContexts, highWatermark);
+}
+
+private void maybeFireHandleCommit(List listenerContexts, 
long highWatermark) {
+// TODO: When there are multiple listeners, we can cache reads to save 
some work
+for (ListenerContext listenerContext : listenerContexts) {
+OptionalLong nextExpectedOffsetOpt = 
listenerContext.nextExpectedOffset();
+if (!nextExpectedOffsetOpt.isPresent()) {
+return;
+}
+
+long nextExpectedOffset = nextExpectedOffsetOpt.getAsLong();
+if (nextExpectedOffset < highWatermark) {
+LogFetchInfo readInfo = log.read(nextExpectedOffset, 
Isolation.COMMITTED);
+listenerContext.fireHandleCommit(nextExpectedOffset, 
readInfo.records);
+}
+}
+}
+
+private void maybeFireHandleCommit(long baseOffset, int epoch, List 
records) {
+for (ListenerContext listenerContext : listenerContexts) {
+OptionalLong nextExpectedOffsetOpt = 
listenerContext.nextExpectedOffset();
+if (!nextExpectedOffsetOpt.isPresent()) {
+return;
+}
+
+long nextExpectedOffset = nextExpectedOffsetOpt.getAsLong();
+if (nextExpectedOffset == baseOffset) {
+listenerContext.fireHandleCommit(baseOffset, epoch, records);
+}
+}
+}
+
+private void maybeFireHandleClaim(LeaderState state) {
+for (ListenerContext listenerContext : listenerContexts) {
+int leaderEpoch = state.epoch();
+
+// We can fire `handleClaim` as soon as the listener has caught
+// up to the start of the leader epoch. This guarantees that the
+// state machine has seen the full committed state before it 
becomes
+// leader and begins writing to the log.

Review comment:
   I thought a little about it. Right now the state machine has just two 
states: 1) i am not a leader, and 2) i am a leader and have caught up with all 
committed data from previous epochs.  An alternative design is to fire 
`handleClaim` immediately and provide the starting offset of the leader epoch. 
Then the controller can wait until its state machine has caught up to that 
offset before starting to write data. In the end, I decided not to do it 
because it adds a third state and I did not expect the controller would be able 
to do anything useful in the additional state. The point about heartbeats is 
interesting, but even that seems tricky since the controller would not know if 
a broker had been fenced until it has caught up. I think the only thing the 
controller could do is hold the requests in purgatory, which might be better 
than letting them retry, but not sure it's worth it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9482: KAFKA-10632; Raft client should push all committed data to state machines

2020-10-22 Thread GitBox


hachikuji commented on a change in pull request #9482:
URL: https://github.com/apache/kafka/pull/9482#discussion_r510438318



##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -206,30 +234,77 @@ private void updateLeaderEndOffsetAndTimestamp(
 final LogOffsetMetadata endOffsetMetadata = log.endOffset();
 
 if (state.updateLocalState(currentTimeMs, endOffsetMetadata)) {
-updateHighWatermark(state, currentTimeMs);
+onUpdateLeaderHighWatermark(state, currentTimeMs);
 }
 
-LogOffset endOffset = new LogOffset(endOffsetMetadata.offset, 
Isolation.UNCOMMITTED);
-fetchPurgatory.maybeComplete(endOffset, currentTimeMs);
+fetchPurgatory.maybeComplete(endOffsetMetadata.offset, currentTimeMs);
 }
 
-private void updateHighWatermark(
-EpochState state,
+private void onUpdateLeaderHighWatermark(
+LeaderState state,
 long currentTimeMs
 ) {
 state.highWatermark().ifPresent(highWatermark -> {
-logger.debug("High watermark updated to {}", highWatermark);
+logger.debug("Leader high watermark updated to {}", highWatermark);
 log.updateHighWatermark(highWatermark);
-
-LogOffset offset = new LogOffset(highWatermark.offset, 
Isolation.COMMITTED);
-appendPurgatory.maybeComplete(offset, currentTimeMs);
-fetchPurgatory.maybeComplete(offset, currentTimeMs);
+appendPurgatory.maybeComplete(highWatermark.offset, currentTimeMs);
+maybeFireHandleCommit(highWatermark.offset);
 });
 }
 
-@Override
-public LeaderAndEpoch currentLeaderAndEpoch() {
-return quorum.leaderAndEpoch();
+private void maybeFireHandleCommit(long highWatermark) {
+maybeFireHandleCommit(listenerContexts, highWatermark);
+}
+
+private void maybeFireHandleCommit(List listenerContexts, 
long highWatermark) {
+// TODO: When there are multiple listeners, we can cache reads to save 
some work
+for (ListenerContext listenerContext : listenerContexts) {
+OptionalLong nextExpectedOffsetOpt = 
listenerContext.nextExpectedOffset();
+if (!nextExpectedOffsetOpt.isPresent()) {
+return;

Review comment:
   Good catch. I think that I forgot to update this when moving from a 
single listener to multiple listeners.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9482: KAFKA-10632; Raft client should push all committed data to state machines

2020-10-22 Thread GitBox


hachikuji commented on a change in pull request #9482:
URL: https://github.com/apache/kafka/pull/9482#discussion_r510437477



##
File path: raft/src/main/java/org/apache/kafka/raft/QuorumState.java
##
@@ -369,6 +379,17 @@ public void transitionToLeader(long epochStartOffset) 
throws IOException {
 if (!candidateState.isVoteGranted())
 throw new IllegalStateException("Cannot become leader without 
majority votes granted");
 
+// Note that the leader does not retain the high watermark that was 
known
+// in the previous state. The purpose of this is to protect the 
monotonicity

Review comment:
   It was intended to refer to the behavior of not retaining the high 
watermark from the previous sentence. I will attempt to clarify.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9482: KAFKA-10632; Raft client should push all committed data to state machines

2020-10-22 Thread GitBox


hachikuji commented on a change in pull request #9482:
URL: https://github.com/apache/kafka/pull/9482#discussion_r510433916



##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -1757,35 +1809,86 @@ public void complete() {
 }
 }
 
-private static class UnwrittenAppend {
-private final Records records;
-private final long createTimeMs;
-private final long requestTimeoutMs;
-private final AckMode ackMode;
-private final CompletableFuture future;
+private final class ListenerContext implements 
CloseListener> {
+private final RaftClient.Listener listener;
+private BatchReader lastSent = null;
+private long lastAckedOffset = 0;
+private int claimedEpoch = 0;
+
+private ListenerContext(Listener listener) {
+this.listener = listener;
+}
+
+/**
+ * Get the last acked offset, which is one greater than the offset of 
the
+ * last record which was acked by the state machine.
+ */
+public synchronized long lastAckedOffset() {
+return lastAckedOffset;
+}
+
+/**
+ * Get the next expected offset, which might be larger than the last 
acked
+ * offset if there are inflight batches which have not been acked yet.
+ * Note that when fetching from disk, we may not know the last offset 
of
+ * inflight data until it has been processed by the state machine. In 
this case,
+ * we delay sending additional data until the state machine has read 
to the
+ * end and the last offset is determined.

Review comment:
   When catching up from the log, yes. However, I have implemented an 
optimization for writes from the leader. We save the original batch in memory 
so that it can be sent back to the state machine after the write is committed. 
In this case, we know the last offset of the batch, so we can have multiple 
inflight batches sent to the controller. This is nice because it means the 
elected controller will not have to read from disk.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9482: KAFKA-10632; Raft client should push all committed data to state machines

2020-10-22 Thread GitBox


hachikuji commented on a change in pull request #9482:
URL: https://github.com/apache/kafka/pull/9482#discussion_r510432303



##
File path: raft/src/main/java/org/apache/kafka/raft/RaftClient.java
##
@@ -16,57 +16,75 @@
  */
 package org.apache.kafka.raft;
 
-import org.apache.kafka.common.record.Records;
-
 import java.io.IOException;
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
 
-public interface RaftClient {
+public interface RaftClient {
+
+interface Listener {
+/**
+ * Callback which is invoked for all records committed to the log.
+ * It is the responsibility of the caller to invoke {@link 
BatchReader#close()}
+ * after consuming the reader.
+ *
+ * Note that there is not a one-to-one correspondence between writes 
through
+ * {@link #scheduleAppend(int, List)} and this callback. The Raft 
implementation
+ * is free to batch together the records from multiple append calls 
provided
+ * that batch boundaries are respected. This means that each batch 
specified
+ * through {@link #scheduleAppend(int, List)} is guaranteed to be a 
subset of
+ * a batch provided by the {@link BatchReader}.
+ *
+ * @param reader reader instance which must be iterated
+ */
+void handleCommit(BatchReader reader);
+
+/**
+ * Invoked after this node has become a leader. This is only called 
after
+ * all commits up to the start of the leader's epoch have been sent to
+ * {@link #handleCommit(BatchReader)}.
+ *
+ * After becoming a leader, the client is eligible to write to the log
+ * using {@link #scheduleAppend(int, List)}.
+ *
+ * @param epoch the claimed leader epoch
+ */
+default void handleClaim(int epoch) {}
+
+/**
+ * Invoked after a leader has stepped down. This callback may or may 
not
+ * fire before the next leader has been elected.
+ */
+default void handleResign() {}
+}
 
 /**
- * Initialize the client. This should only be called once and it must be
- * called before any of the other APIs can be invoked.
+ * Initialize the client. This should only be called once on startup.
  *
  * @throws IOException For any IO errors during initialization
  */
 void initialize() throws IOException;
 
 /**
- * Append a new entry to the log. The client must be in the leader state to
- * accept an append: it is up to the state machine implementation
- * to ensure this using {@link #currentLeaderAndEpoch()}.
- *
- * TODO: One improvement we can make here is to allow the caller to specify
- * the current leader epoch in the record set. That would ensure that each
- * leader change must be "observed" by the state machine before new appends
- * are accepted.
- *
- * @param records The records to append to the log
- * @param timeoutMs Maximum time to wait for the append to complete
- * @return A future containing the last offset and epoch of the appended 
records (if successful)
- */
-CompletableFuture append(Records records, AckMode ackMode, 
long timeoutMs);
-
-/**
- * Read a set of records from the log. Note that it is the responsibility 
of the state machine
- * to filter control records added by the Raft client itself.
- *
- * If the fetch offset is no longer valid, then the future will be 
completed exceptionally
- * with a {@link LogTruncationException}.
+ * Register a listener to get commit/leader notifications.
  *
- * @param position The position to fetch from
- * @param isolation The isolation level to apply to the read
- * @param maxWaitTimeMs The maximum time to wait for new data to become 
available before completion
- * @return The record set, which may be empty if fetching from the end of 
the log
+ * @param listener the listener
  */
-CompletableFuture read(OffsetAndEpoch position, Isolation 
isolation, long maxWaitTimeMs);
+void register(Listener listener);
 
 /**
- * Get the current leader (if known) and the current epoch.
+ * Append a list of records to the log. The write will be scheduled for 
some time
+ * in the future. There is no guarantee that appended records will be 
written to
+ * the log and eventually committed. However, it is guaranteed that if any 
of the
+ * records become committed, then all of them will be.
  *
- * @return Current leader and epoch information
+ * @param epoch the current leader epoch
+ * @param records the list of records to append
+ * @return the offset within the current epoch that the log entries will 
be appended,
+ * or null if the leader was unable to accept the write (e.g. due 
to memory
+ * being reached).
  */
-LeaderAndEpoch 

[GitHub] [kafka] hachikuji commented on a change in pull request #9482: KAFKA-10632; Raft client should push all committed data to state machines

2020-10-22 Thread GitBox


hachikuji commented on a change in pull request #9482:
URL: https://github.com/apache/kafka/pull/9482#discussion_r510426714



##
File path: raft/src/main/java/org/apache/kafka/raft/RaftClient.java
##
@@ -16,57 +16,75 @@
  */
 package org.apache.kafka.raft;
 
-import org.apache.kafka.common.record.Records;
-
 import java.io.IOException;
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
 
-public interface RaftClient {
+public interface RaftClient {
+
+interface Listener {
+/**
+ * Callback which is invoked for all records committed to the log.
+ * It is the responsibility of the caller to invoke {@link 
BatchReader#close()}
+ * after consuming the reader.
+ *
+ * Note that there is not a one-to-one correspondence between writes 
through
+ * {@link #scheduleAppend(int, List)} and this callback. The Raft 
implementation
+ * is free to batch together the records from multiple append calls 
provided
+ * that batch boundaries are respected. This means that each batch 
specified
+ * through {@link #scheduleAppend(int, List)} is guaranteed to be a 
subset of
+ * a batch provided by the {@link BatchReader}.
+ *
+ * @param reader reader instance which must be iterated
+ */
+void handleCommit(BatchReader reader);
+
+/**
+ * Invoked after this node has become a leader. This is only called 
after
+ * all commits up to the start of the leader's epoch have been sent to
+ * {@link #handleCommit(BatchReader)}.
+ *
+ * After becoming a leader, the client is eligible to write to the log
+ * using {@link #scheduleAppend(int, List)}.
+ *
+ * @param epoch the claimed leader epoch
+ */
+default void handleClaim(int epoch) {}
+
+/**
+ * Invoked after a leader has stepped down. This callback may or may 
not
+ * fire before the next leader has been elected.
+ */
+default void handleResign() {}
+}
 
 /**
- * Initialize the client. This should only be called once and it must be
- * called before any of the other APIs can be invoked.
+ * Initialize the client. This should only be called once on startup.
  *
  * @throws IOException For any IO errors during initialization
  */
 void initialize() throws IOException;
 
 /**
- * Append a new entry to the log. The client must be in the leader state to
- * accept an append: it is up to the state machine implementation
- * to ensure this using {@link #currentLeaderAndEpoch()}.
- *
- * TODO: One improvement we can make here is to allow the caller to specify
- * the current leader epoch in the record set. That would ensure that each
- * leader change must be "observed" by the state machine before new appends
- * are accepted.
- *
- * @param records The records to append to the log
- * @param timeoutMs Maximum time to wait for the append to complete
- * @return A future containing the last offset and epoch of the appended 
records (if successful)
- */
-CompletableFuture append(Records records, AckMode ackMode, 
long timeoutMs);
-
-/**
- * Read a set of records from the log. Note that it is the responsibility 
of the state machine
- * to filter control records added by the Raft client itself.
- *
- * If the fetch offset is no longer valid, then the future will be 
completed exceptionally
- * with a {@link LogTruncationException}.
+ * Register a listener to get commit/leader notifications.
  *
- * @param position The position to fetch from
- * @param isolation The isolation level to apply to the read
- * @param maxWaitTimeMs The maximum time to wait for new data to become 
available before completion
- * @return The record set, which may be empty if fetching from the end of 
the log
+ * @param listener the listener
  */
-CompletableFuture read(OffsetAndEpoch position, Isolation 
isolation, long maxWaitTimeMs);
+void register(Listener listener);
 
 /**
- * Get the current leader (if known) and the current epoch.
+ * Append a list of records to the log. The write will be scheduled for 
some time
+ * in the future. There is no guarantee that appended records will be 
written to
+ * the log and eventually committed. However, it is guaranteed that if any 
of the
+ * records become committed, then all of them will be.
  *
- * @return Current leader and epoch information
+ * @param epoch the current leader epoch
+ * @param records the list of records to append
+ * @return the offset within the current epoch that the log entries will 
be appended,
+ * or null if the leader was unable to accept the write (e.g. due 
to memory
+ * being reached).

Review comment:
   Agreed. I added this here: