This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.3 by this push:
     new b4b57dbcff KAFKA-13888; Implement `LastFetchTimestamp` and  in 
`LastCaughtUpTimestamp` for DescribeQuorumResponse [KIP-836] (#12508)
b4b57dbcff is described below

commit b4b57dbcffdf54583e75b6cdedbf0a4fa941981a
Author: Niket <niket-g...@users.noreply.github.com>
AuthorDate: Fri Aug 19 15:09:09 2022 -0700

    KAFKA-13888; Implement `LastFetchTimestamp` and  in `LastCaughtUpTimestamp` 
for DescribeQuorumResponse [KIP-836] (#12508)
    
    This commit implements the newly added fields `LastFetchTimestamp` and 
`LastCaughtUpTimestamp` for KIP-836: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-836:+Addition+of+Information+in+DescribeQuorumResponse+about+Voter+Lag.
    
    Reviewers: Jason Gustafson <ja...@confluent.io>
---
 .../main/scala/kafka/network/RequestChannel.scala  |   9 +-
 .../kafka/server/KRaftClusterTest.scala            |  55 +++++++-
 .../kafka/server/DescribeQuorumRequestTest.scala   |   9 +-
 .../org/apache/kafka/raft/KafkaRaftClient.java     |  16 +--
 .../java/org/apache/kafka/raft/LeaderState.java    |  94 ++++++++++----
 .../org/apache/kafka/raft/KafkaRaftClientTest.java |  22 +++-
 .../org/apache/kafka/raft/LeaderStateTest.java     | 140 +++++++++++++++++----
 .../kafka/raft/internals/KafkaRaftMetricsTest.java |   2 +-
 8 files changed, 281 insertions(+), 66 deletions(-)

diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala 
b/core/src/main/scala/kafka/network/RequestChannel.scala
index 4fa611206a..2200757c70 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -121,10 +121,17 @@ object RequestChannel extends Logging {
 
     def isForwarded: Boolean = envelope.isDefined
 
+    private def shouldReturnNotController(response: AbstractResponse): Boolean 
= {
+      response match {
+        case describeQuorumResponse: DescribeQuorumResponse => 
response.errorCounts.containsKey(Errors.NOT_LEADER_OR_FOLLOWER)
+        case _ => response.errorCounts.containsKey(Errors.NOT_CONTROLLER)
+      }
+    }
+
     def buildResponseSend(abstractResponse: AbstractResponse): Send = {
       envelope match {
         case Some(request) =>
-          val envelopeResponse = if 
(abstractResponse.errorCounts().containsKey(Errors.NOT_CONTROLLER)) {
+          val envelopeResponse = if 
(shouldReturnNotController(abstractResponse)) {
             // Since it's a NOT_CONTROLLER error response, we need to make 
envelope response with NOT_CONTROLLER error
             // to notify the requester (i.e. BrokerToControllerRequestThread) 
to update active controller
             new EnvelopeResponse(new EnvelopeResponseData()
diff --git 
a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala 
b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
index 509facf921..a16cf821d4 100644
--- a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
+++ b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
@@ -21,7 +21,7 @@ import kafka.network.SocketServer
 import kafka.server.IntegrationTestUtils.connectAndReceive
 import kafka.testkit.{BrokerNode, KafkaClusterTestKit, TestKitNodes}
 import kafka.utils.TestUtils
-import org.apache.kafka.clients.admin.{Admin, AlterConfigOp, Config, 
ConfigEntry, NewPartitionReassignment, NewTopic}
+import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, 
AlterConfigOp, Config, ConfigEntry, DescribeMetadataQuorumOptions, 
NewPartitionReassignment, NewTopic}
 import org.apache.kafka.common.{TopicPartition, TopicPartitionInfo}
 import org.apache.kafka.common.message.DescribeClusterRequestData
 import org.apache.kafka.common.network.ListenerName
@@ -32,7 +32,7 @@ import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{Tag, Test, Timeout}
 
 import java.util
-import java.util.{Arrays, Collections, Optional}
+import java.util.{Arrays, Collections, Optional, OptionalLong, Properties}
 import org.apache.kafka.clients.admin.AlterConfigOp.OpType
 import org.apache.kafka.common.config.ConfigResource
 import org.apache.kafka.common.config.ConfigResource.Type
@@ -778,4 +778,55 @@ class KRaftClusterTest {
       cluster.close()
     }
   }
+  def createAdminClient(cluster: KafkaClusterTestKit): Admin = {
+    var props: Properties = null
+    props = cluster.clientProperties()
+    props.put(AdminClientConfig.CLIENT_ID_CONFIG, this.getClass.getName)
+    Admin.create(props)
+  }
+
+  @Test
+  def testDescribeQuorumRequestToBrokers() : Unit = {
+    val cluster = new KafkaClusterTestKit.Builder(
+      new TestKitNodes.Builder().
+        setNumBrokerNodes(4).
+        setNumControllerNodes(3).build()).build()
+    try {
+      cluster.format
+      cluster.startup
+      for (i <- 0 to 3) {
+        TestUtils.waitUntilTrue(() => cluster.brokers.get(i).brokerState == 
BrokerState.RUNNING,
+          "Broker Never started up")
+      }
+      val admin = createAdminClient(cluster)
+      try {
+        val quorumState = admin.describeMetadataQuorum(new 
DescribeMetadataQuorumOptions)
+        val quorumInfo = quorumState.quorumInfo.get()
+
+        assertEquals(cluster.controllers.asScala.keySet, 
quorumInfo.voters.asScala.map(_.replicaId).toSet)
+        
assertTrue(cluster.controllers.asScala.keySet.contains(quorumInfo.leaderId),
+          s"Leader ID ${quorumInfo.leaderId} was not a controller ID.")
+
+        quorumInfo.voters.forEach { voter =>
+          assertTrue(0 < voter.logEndOffset,
+            s"logEndOffset for voter with ID ${voter.replicaId} was 
${voter.logEndOffset}")
+          assertNotEquals(OptionalLong.empty(), voter.lastFetchTimeMs)
+          assertNotEquals(OptionalLong.empty(), voter.lastCaughtUpTimeMs)
+        }
+
+        assertEquals(cluster.brokers.asScala.keySet, 
quorumInfo.observers.asScala.map(_.replicaId).toSet)
+        quorumInfo.observers.forEach { observer =>
+          assertTrue(0 < observer.logEndOffset,
+            s"logEndOffset for observer with ID ${observer.replicaId} was 
${observer.logEndOffset}")
+          assertNotEquals(OptionalLong.empty(), observer.lastFetchTimeMs)
+          assertNotEquals(OptionalLong.empty(), observer.lastCaughtUpTimeMs)
+        }
+      } finally {
+        admin.close()
+      }
+    } finally {
+      cluster.close()
+    }
+  }
+
 }
diff --git 
a/core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala
index b53004b2ea..eed58961e4 100644
--- a/core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala
@@ -87,8 +87,13 @@ class DescribeQuorumRequestTest(cluster: ClusterInstance) {
 
       (voterData ++ observerData).foreach { state =>
         assertTrue(0 < state.logEndOffset)
-        assertEquals(-1, state.lastFetchTimestamp)
-        assertEquals(-1, state.lastCaughtUpTimestamp)
+        if (version == 0) {
+          assertEquals(-1, state.lastFetchTimestamp)
+          assertEquals(-1, state.lastCaughtUpTimestamp)
+        } else {
+          assertNotEquals(-1, state.lastFetchTimestamp)
+          assertNotEquals(-1, state.lastCaughtUpTimestamp)
+        }
       }
     }
   }
diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java 
b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
index 042a141a76..69d2025b6b 100644
--- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
+++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
@@ -25,7 +25,6 @@ import 
org.apache.kafka.common.message.BeginQuorumEpochRequestData;
 import org.apache.kafka.common.message.BeginQuorumEpochResponseData;
 import org.apache.kafka.common.message.DescribeQuorumRequestData;
 import org.apache.kafka.common.message.DescribeQuorumResponseData;
-import org.apache.kafka.common.message.DescribeQuorumResponseData.ReplicaState;
 import org.apache.kafka.common.message.EndQuorumEpochRequestData;
 import org.apache.kafka.common.message.EndQuorumEpochResponseData;
 import org.apache.kafka.common.message.FetchRequestData;
@@ -95,7 +94,6 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Supplier;
-import java.util.stream.Collectors;
 
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static org.apache.kafka.raft.RaftUtil.hasValidTopicPartition;
@@ -1016,7 +1014,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
             if (validOffsetAndEpoch.kind() == ValidOffsetAndEpoch.Kind.VALID) {
                 LogFetchInfo info = log.read(fetchOffset, 
Isolation.UNCOMMITTED);
 
-                if (state.updateReplicaState(replicaId, currentTimeMs, 
info.startOffsetMetadata)) {
+                if (state.updateReplicaState(replicaId, currentTimeMs, 
info.startOffsetMetadata, log.endOffset().offset)) {
                     onUpdateLeaderHighWatermark(state, currentTimeMs);
                 }
 
@@ -1182,8 +1180,8 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
             leaderState.localId(),
             leaderState.epoch(),
             leaderState.highWatermark().isPresent() ? 
leaderState.highWatermark().get().offset : -1,
-            convertToReplicaStates(leaderState.getVoterEndOffsets()),
-            
convertToReplicaStates(leaderState.getObserverStates(currentTimeMs))
+            leaderState.quorumResponseVoterStates(currentTimeMs),
+            leaderState.quorumResponseObserverStates(currentTimeMs)
         );
     }
 
@@ -1421,14 +1419,6 @@ public class KafkaRaftClient<T> implements RaftClient<T> 
{
         return true;
     }
 
-    List<ReplicaState> convertToReplicaStates(Map<Integer, Long> 
replicaEndOffsets) {
-        return replicaEndOffsets.entrySet().stream()
-                   .map(entry -> new ReplicaState()
-                                     .setReplicaId(entry.getKey())
-                                     .setLogEndOffset(entry.getValue()))
-                   .collect(Collectors.toList());
-    }
-
     private boolean hasConsistentLeader(int epoch, OptionalInt leaderId) {
         // Only elected leaders are sent in the request/response header, so if 
we have an elected
         // leaderId, it should be consistent with what is in the message.
diff --git a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java 
b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
index 8717d4e4d2..0b8ebad8bd 100644
--- a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
+++ b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.raft;
 
+import org.apache.kafka.common.message.DescribeQuorumResponseData;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.raft.internals.BatchAccumulator;
 import org.slf4j.Logger;
@@ -25,6 +26,7 @@ import 
org.apache.kafka.common.message.LeaderChangeMessage.Voter;
 import org.apache.kafka.common.record.ControlRecordUtils;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -205,10 +207,10 @@ public class LeaderState<T> implements EpochState {
     /**
      * Update the local replica state.
      *
-     * See {@link #updateReplicaState(int, long, LogOffsetMetadata)}
+     * See {@link #updateReplicaState(int, long, LogOffsetMetadata, long)}
      */
     public boolean updateLocalState(long fetchTimestamp, LogOffsetMetadata 
logOffsetMetadata) {
-        return updateReplicaState(localId, fetchTimestamp, logOffsetMetadata);
+        return updateReplicaState(localId, fetchTimestamp, logOffsetMetadata, 
logOffsetMetadata.offset);
     }
 
     /**
@@ -217,11 +219,15 @@ public class LeaderState<T> implements EpochState {
      * @param replicaId replica id
      * @param fetchTimestamp fetch timestamp
      * @param logOffsetMetadata new log offset and metadata
+     * @param leaderLogEndOffset current log end offset of the leader
      * @return true if the high watermark is updated too
      */
-    public boolean updateReplicaState(int replicaId,
-                                      long fetchTimestamp,
-                                      LogOffsetMetadata logOffsetMetadata) {
+    public boolean updateReplicaState(
+        int replicaId,
+        long fetchTimestamp,
+        LogOffsetMetadata logOffsetMetadata,
+        long leaderLogEndOffset
+    ) {
         // Ignore fetches from negative replica id, as it indicates
         // the fetch is from non-replica. For example, a consumer.
         if (replicaId < 0) {
@@ -229,7 +235,18 @@ public class LeaderState<T> implements EpochState {
         }
 
         ReplicaState state = getReplicaState(replicaId);
-        state.updateFetchTimestamp(fetchTimestamp);
+
+        // Only proceed with updating the states if the offset update is valid
+        verifyEndOffsetUpdate(state, logOffsetMetadata);
+
+        // Update the Last CaughtUp Time
+        if (logOffsetMetadata.offset >= leaderLogEndOffset) {
+            state.updateLastCaughtUpTimestamp(fetchTimestamp);
+        } else if (logOffsetMetadata.offset >= 
state.lastFetchLeaderLogEndOffset.orElse(-1L)) {
+            
state.updateLastCaughtUpTimestamp(state.lastFetchTimestamp.orElse(-1L));
+        }
+
+        state.updateFetchTimestamp(fetchTimestamp, leaderLogEndOffset);
         return updateEndOffset(state, logOffsetMetadata);
     }
 
@@ -246,8 +263,10 @@ public class LeaderState<T> implements EpochState {
             .collect(Collectors.toList());
     }
 
-    private boolean updateEndOffset(ReplicaState state,
-                                    LogOffsetMetadata endOffsetMetadata) {
+    private void verifyEndOffsetUpdate(
+        ReplicaState state,
+        LogOffsetMetadata endOffsetMetadata
+    ) {
         state.endOffset.ifPresent(currentEndOffset -> {
             if (currentEndOffset.offset > endOffsetMetadata.offset) {
                 if (state.nodeId == localId) {
@@ -259,7 +278,11 @@ public class LeaderState<T> implements EpochState {
                 }
             }
         });
-
+    }
+    private boolean updateEndOffset(
+        ReplicaState state,
+        LogOffsetMetadata endOffsetMetadata
+    ) {
         state.endOffset = Optional.of(endOffsetMetadata);
         state.hasAcknowledgedLeader = true;
         return isVoter(state.nodeId) && updateHighWatermark();
@@ -290,22 +313,36 @@ public class LeaderState<T> implements EpochState {
         return state;
     }
 
-    Map<Integer, Long> getVoterEndOffsets() {
-        return getReplicaEndOffsets(voterStates);
+    List<DescribeQuorumResponseData.ReplicaState> 
quorumResponseVoterStates(long currentTimeMs) {
+        return quorumResponseReplicaStates(voterStates.values(), localId, 
currentTimeMs);
     }
 
-    Map<Integer, Long> getObserverStates(final long currentTimeMs) {
+    List<DescribeQuorumResponseData.ReplicaState> 
quorumResponseObserverStates(long currentTimeMs) {
         clearInactiveObservers(currentTimeMs);
-        return getReplicaEndOffsets(observerStates);
+        return quorumResponseReplicaStates(observerStates.values(), localId, 
currentTimeMs);
     }
 
-    private static <R extends ReplicaState> Map<Integer, Long> 
getReplicaEndOffsets(
-        Map<Integer, R> replicaStates) {
-        return replicaStates.entrySet().stream()
-                   .collect(Collectors.toMap(Map.Entry::getKey,
-                       e -> e.getValue().endOffset.map(
-                           logOffsetMetadata -> 
logOffsetMetadata.offset).orElse(-1L))
-                   );
+    private static  List<DescribeQuorumResponseData.ReplicaState> 
quorumResponseReplicaStates(
+        Collection<ReplicaState> state,
+        int leaderId,
+        long currentTimeMs
+    ) {
+        return state.stream().map(s -> {
+            final long lastCaughtUpTimestamp;
+            final long lastFetchTimestamp;
+            if (s.nodeId == leaderId) {
+                lastCaughtUpTimestamp = currentTimeMs;
+                lastFetchTimestamp = currentTimeMs;
+            } else {
+                lastCaughtUpTimestamp = s.lastCaughtUpTimestamp.orElse(-1);
+                lastFetchTimestamp = s.lastFetchTimestamp.orElse(-1);
+            }
+            return new DescribeQuorumResponseData.ReplicaState()
+                    .setReplicaId(s.nodeId)
+                    .setLogEndOffset(s.endOffset.map(md -> 
md.offset).orElse(-1L))
+                    .setLastCaughtUpTimestamp(lastCaughtUpTimestamp)
+                    .setLastFetchTimestamp(lastFetchTimestamp);
+        }).collect(Collectors.toList());
     }
 
     private void clearInactiveObservers(final long currentTimeMs) {
@@ -323,19 +360,30 @@ public class LeaderState<T> implements EpochState {
         final int nodeId;
         Optional<LogOffsetMetadata> endOffset;
         OptionalLong lastFetchTimestamp;
+        OptionalLong lastFetchLeaderLogEndOffset;
+        OptionalLong lastCaughtUpTimestamp;
         boolean hasAcknowledgedLeader;
 
         public ReplicaState(int nodeId, boolean hasAcknowledgedLeader) {
             this.nodeId = nodeId;
             this.endOffset = Optional.empty();
             this.lastFetchTimestamp = OptionalLong.empty();
+            this.lastFetchLeaderLogEndOffset = OptionalLong.empty();
+            this.lastCaughtUpTimestamp = OptionalLong.empty();
             this.hasAcknowledgedLeader = hasAcknowledgedLeader;
         }
 
-        void updateFetchTimestamp(long currentFetchTimeMs) {
+        void updateFetchTimestamp(long currentFetchTimeMs, long 
leaderLogEndOffset) {
             // To be resilient to system time shifts we do not strictly
             // require the timestamp be monotonically increasing.
             lastFetchTimestamp = 
OptionalLong.of(Math.max(lastFetchTimestamp.orElse(-1L), currentFetchTimeMs));
+            lastFetchLeaderLogEndOffset = OptionalLong.of(leaderLogEndOffset);
+        }
+
+        void updateLastCaughtUpTimestamp(long lastCaughtUpTime) {
+            // This value relies on the fetch timestamp which does not
+            // require monotonicity
+            lastCaughtUpTimestamp = 
OptionalLong.of(Math.max(lastCaughtUpTimestamp.orElse(-1L), lastCaughtUpTime));
         }
 
         @Override
@@ -353,10 +401,12 @@ public class LeaderState<T> implements EpochState {
         @Override
         public String toString() {
             return String.format(
-                "ReplicaState(nodeId=%d, endOffset=%s, lastFetchTimestamp=%s, 
hasAcknowledgedLeader=%s)",
+                "ReplicaState(nodeId=%d, endOffset=%s, lastFetchTimestamp=%s, 
" +
+                        "lastCaughtUpTimestamp=%s, hasAcknowledgedLeader=%s)",
                 nodeId,
                 endOffset,
                 lastFetchTimestamp,
+                lastCaughtUpTimestamp,
                 hasAcknowledgedLeader 
             );
         }
diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java 
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
index a8a346e6db..678648505b 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
@@ -1985,6 +1985,7 @@ public class KafkaRaftClientTest {
 
         RaftClientTestContext context = 
RaftClientTestContext.initializeAsLeader(localId, voters, epoch);
 
+        long laggingFollowerFetchTime = context.time.milliseconds();
         context.deliverRequest(context.fetchRequest(1, laggingFollower, 1L, 
epoch, 0));
         context.pollUntilResponse();
         context.assertSentFetchPartitionResponse(1L, epoch);
@@ -1992,16 +1993,21 @@ public class KafkaRaftClientTest {
         context.client.scheduleAppend(epoch, Arrays.asList("foo", "bar"));
         context.client.poll();
 
+        context.time.sleep(100);
+        long closeFollowerFetchTime = context.time.milliseconds();
         context.deliverRequest(context.fetchRequest(epoch, closeFollower, 3L, 
epoch, 0));
         context.pollUntilResponse();
         context.assertSentFetchPartitionResponse(3L, epoch);
 
         // Create observer
         int observerId = 3;
+        context.time.sleep(100);
+        long observerFetchTime = context.time.milliseconds();
         context.deliverRequest(context.fetchRequest(epoch, observerId, 0L, 0, 
0));
         context.pollUntilResponse();
         context.assertSentFetchPartitionResponse(3L, epoch);
 
+        context.time.sleep(100);
         
context.deliverRequest(DescribeQuorumRequest.singletonRequest(context.metadataPartition));
         context.pollUntilResponse();
 
@@ -2011,17 +2017,25 @@ public class KafkaRaftClientTest {
                     .setReplicaId(localId)
                     // As we are appending the records directly to the log,
                     // the leader end offset hasn't been updated yet.
-                    .setLogEndOffset(3L),
+                    .setLogEndOffset(3L)
+                    .setLastFetchTimestamp(context.time.milliseconds())
+                    .setLastCaughtUpTimestamp(context.time.milliseconds()),
                 new ReplicaState()
                     .setReplicaId(laggingFollower)
-                    .setLogEndOffset(1L),
+                    .setLogEndOffset(1L)
+                    .setLastFetchTimestamp(laggingFollowerFetchTime)
+                    .setLastCaughtUpTimestamp(laggingFollowerFetchTime),
                 new ReplicaState()
                     .setReplicaId(closeFollower)
-                    .setLogEndOffset(3)),
+                    .setLogEndOffset(3L)
+                    .setLastFetchTimestamp(closeFollowerFetchTime)
+                    .setLastCaughtUpTimestamp(closeFollowerFetchTime)),
             singletonList(
                 new ReplicaState()
                     .setReplicaId(observerId)
-                    .setLogEndOffset(0L)));
+                    .setLogEndOffset(0L)
+                    .setLastFetchTimestamp(observerFetchTime)
+                    .setLastCaughtUpTimestamp(-1L)));
     }
 
     @Test
diff --git a/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java 
b/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java
index 5f9989d55e..fa54d5cbc6 100644
--- a/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.raft;
 
+import org.apache.kafka.common.message.DescribeQuorumResponseData;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Utils;
@@ -31,7 +32,9 @@ import java.util.Collections;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
+import java.util.stream.Collectors;
 
+import static java.util.Collections.emptyList;
 import static java.util.Collections.emptySet;
 import static java.util.Collections.singleton;
 import static org.apache.kafka.common.utils.Utils.mkEntry;
@@ -119,6 +122,93 @@ public class LeaderStateTest {
             () -> state.updateLocalState(0, new LogOffsetMetadata(15L)));
     }
 
+    @Test
+    public void testLastCaughtUpTimeVoters() {
+        int node1 = 1;
+        int node2 = 2;
+        int currentTime = 1000;
+        int fetchTime = 0;
+        int caughtupTime = -1;
+        LeaderState<?> state = newLeaderState(mkSet(localId, node1, node2), 
10L);
+        assertEquals(Optional.empty(), state.highWatermark());
+        assertFalse(state.updateLocalState(++fetchTime, new 
LogOffsetMetadata(10L)));
+        assertEquals(mkSet(node1, node2), state.nonAcknowledgingVoters());
+        assertEquals(Optional.empty(), state.highWatermark());
+
+        // Node 1 falls behind
+        assertFalse(state.updateLocalState(++fetchTime, new 
LogOffsetMetadata(10L)));
+        assertFalse(state.updateReplicaState(node1, ++fetchTime, new 
LogOffsetMetadata(10L), 11L));
+        assertEquals(currentTime, 
state.quorumResponseVoterStates(currentTime).get(localId).lastCaughtUpTimestamp());
+        assertEquals(caughtupTime, 
state.quorumResponseVoterStates(currentTime).get(node1).lastCaughtUpTimestamp());
+
+        // Node 1 catches up to leader
+        assertFalse(state.updateReplicaState(node1, ++fetchTime, new 
LogOffsetMetadata(11L), 11L));
+        caughtupTime = fetchTime;
+        assertEquals(currentTime, 
state.quorumResponseVoterStates(currentTime).get(localId).lastCaughtUpTimestamp());
+        assertEquals(caughtupTime, 
state.quorumResponseVoterStates(currentTime).get(node1).lastCaughtUpTimestamp());
+
+        // Node 1 falls behind
+        assertFalse(state.updateReplicaState(node1, ++fetchTime, new 
LogOffsetMetadata(50L), 100L));
+        assertEquals(currentTime, 
state.quorumResponseVoterStates(currentTime).get(localId).lastCaughtUpTimestamp());
+        assertEquals(caughtupTime, 
state.quorumResponseVoterStates(currentTime).get(node1).lastCaughtUpTimestamp());
+
+        // Node 1 catches up to the last fetch offset
+        int prevFetchTime = fetchTime;
+        assertFalse(state.updateReplicaState(node1, ++fetchTime, new 
LogOffsetMetadata(102L), 200L));
+        caughtupTime = prevFetchTime;
+        assertEquals(currentTime, 
state.quorumResponseVoterStates(currentTime).get(localId).lastCaughtUpTimestamp());
+        assertEquals(caughtupTime, 
state.quorumResponseVoterStates(currentTime).get(node1).lastCaughtUpTimestamp());
+
+        // Node2 has never caught up to leader
+        assertEquals(-1L, 
state.quorumResponseVoterStates(currentTime).get(node2).lastCaughtUpTimestamp());
+        assertTrue(state.updateReplicaState(node2, ++fetchTime, new 
LogOffsetMetadata(202L), 300L));
+        assertEquals(-1L, 
state.quorumResponseVoterStates(currentTime).get(node2).lastCaughtUpTimestamp());
+        assertFalse(state.updateReplicaState(node2, ++fetchTime, new 
LogOffsetMetadata(250L), 300L));
+        assertEquals(-1L, 
state.quorumResponseVoterStates(currentTime).get(node2).lastCaughtUpTimestamp());
+    }
+
+    @Test
+    public void testLastCaughtUpTimeObserver() {
+        int node1Index = 0;
+        int node1Id = 1;
+        int currentTime = 1000;
+        int fetchTime = 0;
+        int caughtUpTime = -1;
+        LeaderState<?> state = newLeaderState(singleton(localId), 5L);
+        assertEquals(Optional.empty(), state.highWatermark());
+        assertEquals(emptySet(), state.nonAcknowledgingVoters());
+
+        // Node 1 falls behind
+        assertTrue(state.updateLocalState(++fetchTime, new 
LogOffsetMetadata(10L)));
+        assertFalse(state.updateReplicaState(node1Id, ++fetchTime, new 
LogOffsetMetadata(10L), 11L));
+        assertEquals(currentTime, 
state.quorumResponseVoterStates(currentTime).get(localId).lastCaughtUpTimestamp());
+        assertEquals(caughtUpTime, 
state.quorumResponseObserverStates(currentTime).get(node1Index).lastCaughtUpTimestamp());
+
+        // Node 1 catches up to leader
+        assertFalse(state.updateReplicaState(node1Id, ++fetchTime, new 
LogOffsetMetadata(11L), 11L));
+        caughtUpTime = fetchTime;
+        assertEquals(currentTime, 
state.quorumResponseVoterStates(currentTime).get(localId).lastCaughtUpTimestamp());
+        assertEquals(caughtUpTime, 
state.quorumResponseObserverStates(currentTime).get(node1Index).lastCaughtUpTimestamp());
+
+        // Node 1 falls behind
+        assertFalse(state.updateReplicaState(node1Id, ++fetchTime, new 
LogOffsetMetadata(50L), 100L));
+        assertEquals(currentTime, 
state.quorumResponseVoterStates(currentTime).get(localId).lastCaughtUpTimestamp());
+        assertEquals(caughtUpTime, 
state.quorumResponseObserverStates(currentTime).get(node1Index).lastCaughtUpTimestamp());
+
+        // Node 1 catches up to the last fetch offset
+        int prevFetchTime = fetchTime;
+        assertFalse(state.updateReplicaState(node1Id, ++fetchTime, new 
LogOffsetMetadata(102L), 200L));
+        caughtUpTime = prevFetchTime;
+        assertEquals(currentTime, 
state.quorumResponseVoterStates(currentTime).get(localId).lastCaughtUpTimestamp());
+        assertEquals(caughtUpTime, 
state.quorumResponseObserverStates(currentTime).get(node1Index).lastCaughtUpTimestamp());
+
+        // Node 1 catches up to leader
+        assertFalse(state.updateReplicaState(node1Id, ++fetchTime, new 
LogOffsetMetadata(202L), 200L));
+        caughtUpTime = fetchTime;
+        assertEquals(currentTime, 
state.quorumResponseVoterStates(currentTime).get(localId).lastCaughtUpTimestamp());
+        assertEquals(caughtUpTime, 
state.quorumResponseObserverStates(currentTime).get(node1Index).lastCaughtUpTimestamp());
+    }
+
     @Test
     public void testIdempotentEndOffsetUpdate() {
         LeaderState<?> state = newLeaderState(singleton(localId), 15L);
@@ -149,12 +239,12 @@ public class LeaderStateTest {
         assertFalse(state.updateLocalState(0, new LogOffsetMetadata(13L)));
         assertEquals(singleton(otherNodeId), state.nonAcknowledgingVoters());
         assertEquals(Optional.empty(), state.highWatermark());
-        assertFalse(state.updateReplicaState(otherNodeId, 0, new 
LogOffsetMetadata(10L)));
+        assertFalse(state.updateReplicaState(otherNodeId, 0, new 
LogOffsetMetadata(10L), 11L));
         assertEquals(emptySet(), state.nonAcknowledgingVoters());
         assertEquals(Optional.empty(), state.highWatermark());
-        assertTrue(state.updateReplicaState(otherNodeId, 0, new 
LogOffsetMetadata(11L)));
+        assertTrue(state.updateReplicaState(otherNodeId, 0, new 
LogOffsetMetadata(11L), 12L));
         assertEquals(Optional.of(new LogOffsetMetadata(11L)), 
state.highWatermark());
-        assertTrue(state.updateReplicaState(otherNodeId, 0, new 
LogOffsetMetadata(13L)));
+        assertTrue(state.updateReplicaState(otherNodeId, 0, new 
LogOffsetMetadata(13L), 14L));
         assertEquals(Optional.of(new LogOffsetMetadata(13L)), 
state.highWatermark());
     }
 
@@ -166,19 +256,19 @@ public class LeaderStateTest {
         assertFalse(state.updateLocalState(0, new LogOffsetMetadata(15L)));
         assertEquals(mkSet(node1, node2), state.nonAcknowledgingVoters());
         assertEquals(Optional.empty(), state.highWatermark());
-        assertFalse(state.updateReplicaState(node1, 0, new 
LogOffsetMetadata(10L)));
+        assertFalse(state.updateReplicaState(node1, 0, new 
LogOffsetMetadata(10L), 11L));
         assertEquals(singleton(node2), state.nonAcknowledgingVoters());
         assertEquals(Optional.empty(), state.highWatermark());
-        assertFalse(state.updateReplicaState(node2, 0, new 
LogOffsetMetadata(10L)));
+        assertFalse(state.updateReplicaState(node2, 0, new 
LogOffsetMetadata(10L), 11L));
         assertEquals(emptySet(), state.nonAcknowledgingVoters());
         assertEquals(Optional.empty(), state.highWatermark());
-        assertTrue(state.updateReplicaState(node2, 0, new 
LogOffsetMetadata(15L)));
+        assertTrue(state.updateReplicaState(node2, 0, new 
LogOffsetMetadata(15L), 16L));
         assertEquals(Optional.of(new LogOffsetMetadata(15L)), 
state.highWatermark());
         assertFalse(state.updateLocalState(0, new LogOffsetMetadata(20L)));
         assertEquals(Optional.of(new LogOffsetMetadata(15L)), 
state.highWatermark());
-        assertTrue(state.updateReplicaState(node1, 0, new 
LogOffsetMetadata(20L)));
+        assertTrue(state.updateReplicaState(node1, 0, new 
LogOffsetMetadata(20L), 21L));
         assertEquals(Optional.of(new LogOffsetMetadata(20L)), 
state.highWatermark());
-        assertFalse(state.updateReplicaState(node2, 0, new 
LogOffsetMetadata(20L)));
+        assertFalse(state.updateReplicaState(node2, 0, new 
LogOffsetMetadata(20L), 21L));
         assertEquals(Optional.of(new LogOffsetMetadata(20L)), 
state.highWatermark());
     }
 
@@ -188,13 +278,13 @@ public class LeaderStateTest {
         int node1 = 1;
         LeaderState<?> state = newLeaderState(mkSet(localId, node1), 0L);
         state.updateLocalState(time.milliseconds(), new 
LogOffsetMetadata(10L));
-        state.updateReplicaState(node1, time.milliseconds(), new 
LogOffsetMetadata(10L));
+        state.updateReplicaState(node1, time.milliseconds(), new 
LogOffsetMetadata(10L), 11L);
         assertEquals(Optional.of(new LogOffsetMetadata(10L)), 
state.highWatermark());
 
         // Follower crashes and disk is lost. It fetches an earlier offset to 
rebuild state.
         // The leader will report an error in the logs, but will not let the 
high watermark rewind
-        assertFalse(state.updateReplicaState(node1, time.milliseconds(), new 
LogOffsetMetadata(5L)));
-        assertEquals(5L, state.getVoterEndOffsets().get(node1));
+        assertFalse(state.updateReplicaState(node1, time.milliseconds(), new 
LogOffsetMetadata(5L), 11L));
+        assertEquals(5L, 
state.quorumResponseVoterStates(time.milliseconds()).get(node1).logEndOffset());
         assertEquals(Optional.of(new LogOffsetMetadata(10L)), 
state.highWatermark());
     }
 
@@ -224,7 +314,9 @@ public class LeaderStateTest {
             mkEntry(localId, leaderEndOffset),
             mkEntry(node1, leaderStartOffset),
             mkEntry(node2, leaderEndOffset)
-        ), state.getVoterEndOffsets());
+        ), state.quorumResponseVoterStates(0)
+            .stream()
+            
.collect(Collectors.toMap(DescribeQuorumResponseData.ReplicaState::replicaId, 
DescribeQuorumResponseData.ReplicaState::logEndOffset)));
     }
 
     private LeaderState<?> setUpLeaderAndFollowers(int follower1,
@@ -234,8 +326,8 @@ public class LeaderStateTest {
         LeaderState<?> state = newLeaderState(mkSet(localId, follower1, 
follower2), leaderStartOffset);
         state.updateLocalState(0, new LogOffsetMetadata(leaderEndOffset));
         assertEquals(Optional.empty(), state.highWatermark());
-        state.updateReplicaState(follower1, 0, new 
LogOffsetMetadata(leaderStartOffset));
-        state.updateReplicaState(follower2, 0, new 
LogOffsetMetadata(leaderEndOffset));
+        state.updateReplicaState(follower1, 0, new 
LogOffsetMetadata(leaderStartOffset), leaderEndOffset);
+        state.updateReplicaState(follower2, 0, new 
LogOffsetMetadata(leaderEndOffset), leaderEndOffset);
         return state;
     }
 
@@ -246,9 +338,12 @@ public class LeaderStateTest {
 
         LeaderState<?> state = newLeaderState(mkSet(localId), 
epochStartOffset);
         long timestamp = 20L;
-        assertFalse(state.updateReplicaState(observerId, timestamp, new 
LogOffsetMetadata(epochStartOffset)));
+        assertFalse(state.updateReplicaState(observerId, timestamp, new 
LogOffsetMetadata(epochStartOffset), epochStartOffset + 10));
 
-        assertEquals(Collections.singletonMap(observerId, epochStartOffset), 
state.getObserverStates(timestamp));
+        assertEquals(Collections.singletonMap(observerId, epochStartOffset),
+                state.quorumResponseObserverStates(timestamp)
+                    .stream()
+                    
.collect(Collectors.toMap(DescribeQuorumResponseData.ReplicaState::replicaId, 
DescribeQuorumResponseData.ReplicaState::logEndOffset)));
     }
 
     @Test
@@ -257,9 +352,9 @@ public class LeaderStateTest {
         long epochStartOffset = 10L;
 
         LeaderState<?> state = newLeaderState(mkSet(localId), 
epochStartOffset);
-        assertFalse(state.updateReplicaState(observerId, 0, new 
LogOffsetMetadata(epochStartOffset)));
+        assertFalse(state.updateReplicaState(observerId, 0, new 
LogOffsetMetadata(epochStartOffset), epochStartOffset + 10));
 
-        assertEquals(Collections.emptyMap(), state.getObserverStates(10));
+        assertEquals(emptyList(), state.quorumResponseObserverStates(10));
     }
 
     @Test
@@ -269,11 +364,14 @@ public class LeaderStateTest {
         long epochStartOffset = 10L;
         LeaderState<?> state = newLeaderState(mkSet(localId), 
epochStartOffset);
 
-        state.updateReplicaState(observerId, time.milliseconds(), new 
LogOffsetMetadata(epochStartOffset));
-        assertEquals(singleton(observerId), 
state.getObserverStates(time.milliseconds()).keySet());
+        state.updateReplicaState(observerId, time.milliseconds(), new 
LogOffsetMetadata(epochStartOffset), epochStartOffset + 10);
+        assertEquals(singleton(observerId),
+                state.quorumResponseObserverStates(time.milliseconds())
+                    .stream().map(o -> o.replicaId())
+                    .collect(Collectors.toSet()));
 
         time.sleep(LeaderState.OBSERVER_SESSION_TIMEOUT_MS);
-        assertEquals(emptySet(), 
state.getObserverStates(time.milliseconds()).keySet());
+        assertEquals(emptyList(), 
state.quorumResponseObserverStates(time.milliseconds()));
     }
 
     @ParameterizedTest
diff --git 
a/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java 
b/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java
index 0d64eac1cc..cc2700bb17 100644
--- 
a/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java
+++ 
b/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java
@@ -103,7 +103,7 @@ public class KafkaRaftMetricsTest {
         assertEquals((double) -1L, getMetric(metrics, 
"high-watermark").metricValue());
 
         state.leaderStateOrThrow().updateLocalState(0, new 
LogOffsetMetadata(5L));
-        state.leaderStateOrThrow().updateReplicaState(1, 0, new 
LogOffsetMetadata(5L));
+        state.leaderStateOrThrow().updateReplicaState(1, 0, new 
LogOffsetMetadata(5L), 6L);
         assertEquals((double) 5L, getMetric(metrics, 
"high-watermark").metricValue());
 
         state.transitionToFollower(2, 1);

Reply via email to