abbccdda commented on a change in pull request #9482:
URL: https://github.com/apache/kafka/pull/9482#discussion_r513200010
##
File path:
clients/src/main/java/org/apache/kafka/common/protocol/DataInputStreamReadable.java
##
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.protocol;
+
+import org.apache.kafka.common.utils.ByteUtils;
+
+import java.io.Closeable;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class DataInputStreamReadable implements Readable, Closeable {
+protected final DataInputStream input;
+
+public DataInputStreamReadable(DataInputStream input) {
+this.input = input;
+}
+
+@Override
+public byte readByte() {
+try {
+return input.readByte();
+} catch (IOException e) {
+throw new RuntimeException(e);
+}
+}
+
+@Override
+public short readShort() {
+try {
Review comment:
nit: we could refactor out the try-catch logic.
##
File path:
clients/src/main/java/org/apache/kafka/common/metrics/stats/Percentiles.java
##
@@ -113,11 +113,11 @@ protected HistogramSample newSample(long timeMs) {
protected void update(Sample sample, MetricConfig config, double value,
long timeMs) {
final double boundedValue;
if (value > max) {
-log.warn("Received value {} which is greater than max recordable
value {}, will be pinned to the max value",
+log.debug("Received value {} which is greater than max recordable
value {}, will be pinned to the max value",
Review comment:
Why do we change these values to debug?
##
File path: raft/src/main/java/org/apache/kafka/raft/QuorumState.java
##
@@ -369,6 +379,17 @@ public void transitionToLeader(long epochStartOffset)
throws IOException {
if (!candidateState.isVoteGranted())
throw new IllegalStateException("Cannot become leader without
majority votes granted");
+// Note that the leader does not retain the high watermark that was
known
+// in the previous state. The reason it does not is to protect the
monotonicity
Review comment:
`it does not` could be removed.
##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -228,35 +234,81 @@ private void updateLeaderEndOffsetAndTimestamp(
final LogOffsetMetadata endOffsetMetadata = log.endOffset();
if (state.updateLocalState(currentTimeMs, endOffsetMetadata)) {
-updateHighWatermark(state, currentTimeMs);
+onUpdateLeaderHighWatermark(state, currentTimeMs);
}
-LogOffset endOffset = new LogOffset(endOffsetMetadata.offset,
Isolation.UNCOMMITTED);
-fetchPurgatory.maybeComplete(endOffset, currentTimeMs);
+fetchPurgatory.maybeComplete(endOffsetMetadata.offset, currentTimeMs);
}
-private void updateHighWatermark(
-EpochState state,
+private void onUpdateLeaderHighWatermark(
+LeaderState state,
long currentTimeMs
) {
state.highWatermark().ifPresent(highWatermark -> {
-logger.debug("High watermark updated to {}", highWatermark);
+logger.debug("Leader high watermark updated to {}", highWatermark);
log.updateHighWatermark(highWatermark);
-
-LogOffset offset = new LogOffset(highWatermark.offset,
Isolation.COMMITTED);
-appendPurgatory.maybeComplete(offset, currentTimeMs);
-fetchPurgatory.maybeComplete(offset, currentTimeMs);
+appendPurgatory.maybeComplete(highWatermark.offset, currentTimeMs);
+maybeFireHandleCommit(highWatermark.offset);
});
}
-@Override
-public LeaderAndEpoch currentLeaderAndEpoch() {
-return quorum.leaderAndEpoch();
+private void maybeFireHandleCommit(long highWatermark) {
+maybeFireHandleCommit(listenerContexts, highWatermark);
+}
+
+private void maybeFireHandleCommit(List listenerContexts,
long highWatermark) {
+// TODO: When there are multiple listeners, we can cache reads to sa