[GitHub] [kafka] abbccdda commented on a change in pull request #9482: KAFKA-10632; Raft client should push all committed data to state machines

2020-10-29 Thread GitBox


abbccdda commented on a change in pull request #9482:
URL: https://github.com/apache/kafka/pull/9482#discussion_r514482551



##
File path: core/src/main/scala/kafka/raft/TimingWheelExpirationService.scala
##
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.raft
+
+import java.util.concurrent.CompletableFuture
+
+import kafka.raft.TimingWheelExpirationService.TimerTaskCompletableFuture
+import kafka.utils.ShutdownableThread
+import kafka.utils.timer.{Timer, TimerTask}
+import org.apache.kafka.common.errors.TimeoutException
+import org.apache.kafka.raft.ExpirationService
+
+object TimingWheelExpirationService {
+  class TimerTaskCompletableFuture[T](override val delayMs: Long) extends 
CompletableFuture[T] with TimerTask {
+override def run(): Unit = {
+  completeExceptionally(new TimeoutException(
+s"Future failed to be completed before timeout of $delayMs ms was 
reached"))
+}
+  }
+}
+
+class TimingWheelExpirationService(timer: Timer) extends ExpirationService {
+  private val expirationReaper = new ExpiredOperationReaper()
+
+  expirationReaper.start()
+
+  override def await[T](timeoutMs: Long): CompletableFuture[T] = {
+val future = new TimerTaskCompletableFuture[T](timeoutMs)
+future.whenComplete { (_, _) =>
+  future.cancel()
+}
+timer.add(future)
+future
+  }
+
+  private class ExpiredOperationReaper extends ShutdownableThread(
+name = "raft-expiration-reaper", isInterruptible = false) {
+
+override def doWork(): Unit = {
+  timer.advanceClock(200L)

Review comment:
   Could we make 200L a constant?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #9482: KAFKA-10632; Raft client should push all committed data to state machines

2020-10-29 Thread GitBox


abbccdda commented on a change in pull request #9482:
URL: https://github.com/apache/kafka/pull/9482#discussion_r513200010



##
File path: 
clients/src/main/java/org/apache/kafka/common/protocol/DataInputStreamReadable.java
##
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.protocol;
+
+import org.apache.kafka.common.utils.ByteUtils;
+
+import java.io.Closeable;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class DataInputStreamReadable implements Readable, Closeable {
+protected final DataInputStream input;
+
+public DataInputStreamReadable(DataInputStream input) {
+this.input = input;
+}
+
+@Override
+public byte readByte() {
+try {
+return input.readByte();
+} catch (IOException e) {
+throw new RuntimeException(e);
+}
+}
+
+@Override
+public short readShort() {
+try {

Review comment:
   nit: we could refactor out the try-catch logic.

##
File path: 
clients/src/main/java/org/apache/kafka/common/metrics/stats/Percentiles.java
##
@@ -113,11 +113,11 @@ protected HistogramSample newSample(long timeMs) {
 protected void update(Sample sample, MetricConfig config, double value, 
long timeMs) {
 final double boundedValue;
 if (value > max) {
-log.warn("Received value {} which is greater than max recordable 
value {}, will be pinned to the max value",
+log.debug("Received value {} which is greater than max recordable 
value {}, will be pinned to the max value",

Review comment:
   Why do we change these values to debug?

##
File path: raft/src/main/java/org/apache/kafka/raft/QuorumState.java
##
@@ -369,6 +379,17 @@ public void transitionToLeader(long epochStartOffset) 
throws IOException {
 if (!candidateState.isVoteGranted())
 throw new IllegalStateException("Cannot become leader without 
majority votes granted");
 
+// Note that the leader does not retain the high watermark that was 
known
+// in the previous state. The reason it does not is to protect the 
monotonicity

Review comment:
   `it does not` could be removed.

##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -228,35 +234,81 @@ private void updateLeaderEndOffsetAndTimestamp(
 final LogOffsetMetadata endOffsetMetadata = log.endOffset();
 
 if (state.updateLocalState(currentTimeMs, endOffsetMetadata)) {
-updateHighWatermark(state, currentTimeMs);
+onUpdateLeaderHighWatermark(state, currentTimeMs);
 }
 
-LogOffset endOffset = new LogOffset(endOffsetMetadata.offset, 
Isolation.UNCOMMITTED);
-fetchPurgatory.maybeComplete(endOffset, currentTimeMs);
+fetchPurgatory.maybeComplete(endOffsetMetadata.offset, currentTimeMs);
 }
 
-private void updateHighWatermark(
-EpochState state,
+private void onUpdateLeaderHighWatermark(
+LeaderState state,
 long currentTimeMs
 ) {
 state.highWatermark().ifPresent(highWatermark -> {
-logger.debug("High watermark updated to {}", highWatermark);
+logger.debug("Leader high watermark updated to {}", highWatermark);
 log.updateHighWatermark(highWatermark);
-
-LogOffset offset = new LogOffset(highWatermark.offset, 
Isolation.COMMITTED);
-appendPurgatory.maybeComplete(offset, currentTimeMs);
-fetchPurgatory.maybeComplete(offset, currentTimeMs);
+appendPurgatory.maybeComplete(highWatermark.offset, currentTimeMs);
+maybeFireHandleCommit(highWatermark.offset);
 });
 }
 
-@Override
-public LeaderAndEpoch currentLeaderAndEpoch() {
-return quorum.leaderAndEpoch();
+private void maybeFireHandleCommit(long highWatermark) {
+maybeFireHandleCommit(listenerContexts, highWatermark);
+}
+
+private void maybeFireHandleCommit(List listenerContexts, 
long highWatermark) {
+// TODO: When there are multiple listeners, we can cache reads to sa