jsancio commented on code in PR #17807:
URL: https://github.com/apache/kafka/pull/17807#discussion_r1882485059


##########
raft/src/main/java/org/apache/kafka/raft/FollowerState.java:
##########
@@ -37,6 +37,10 @@ public class FollowerState implements EpochState {
     private final Set<Integer> voters;
     // Used for tracking the expiration of both the Fetch and FetchSnapshot 
requests
     private final Timer fetchTimer;
+    /* Used to track if we have fetched from the leader at least once since 
the transition to follower in this epoch.
+     * If we have not yet fetched from the leader, we may grant PreVotes.
+     */
+    private boolean hasFetchedFromLeader;

Review Comment:
   This track is the replica has fetch successfully not just fetched.



##########
raft/src/main/java/org/apache/kafka/raft/ElectionState.java:
##########
@@ -29,7 +29,7 @@
  * Encapsulate election state stored on disk after every state change.
  */
 public final class ElectionState {
-    private static final int UNKNOWN_LEADER_ID = -1;
+    static final int UNKNOWN_LEADER_ID = -1;

Review Comment:
   Why did you make this change? Doesn't look like you use this field anywhere 
in this diff?



##########
raft/src/test/java/org/apache/kafka/raft/UnattachedStateWithVoteTest.java:
##########
@@ -83,35 +83,67 @@ public void testElectionTimeout() {
     public void testCanGrantVoteWithoutDirectoryId(boolean isLogUpToDate) {
         UnattachedState state = 
newUnattachedVotedState(ReplicaKey.NO_DIRECTORY_ID);
 
-        assertTrue(
-            state.canGrantVote(ReplicaKey.of(votedId, 
ReplicaKey.NO_DIRECTORY_ID), isLogUpToDate)
+        assertEquals(
+            isLogUpToDate,
+            state.canGrantPreVote(ReplicaKey.of(votedId, 
ReplicaKey.NO_DIRECTORY_ID), isLogUpToDate)
         );
-        assertTrue(
-            state.canGrantVote(
-                ReplicaKey.of(votedId, Uuid.randomUuid()),
-                isLogUpToDate
-            )
+        assertTrue(state.canGrantVote(ReplicaKey.of(votedId, 
ReplicaKey.NO_DIRECTORY_ID), isLogUpToDate));

Review Comment:
   Interesting that pre-vote can but false when normal vote is true.



##########
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientPreVoteTest.java:
##########
@@ -0,0 +1,543 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.MemoryRecords;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.Collections;
+import java.util.OptionalInt;
+import java.util.Set;
+
+import static org.apache.kafka.raft.KafkaRaftClientTest.randomReplicaId;
+import static org.apache.kafka.raft.KafkaRaftClientTest.replicaKey;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class KafkaRaftClientPreVoteTest {
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testHandlePreVoteRequestAsFollowerWithElectedLeader(boolean 
hasFetchedFromLeader) throws Exception {
+        int localId = randomReplicaId();
+        int epoch = 2;
+        ReplicaKey otherNodeKey = replicaKey(localId + 1, true);
+        int electedLeaderId = localId + 2;
+        Set<Integer> voters = Set.of(localId, otherNodeKey.id(), 
electedLeaderId);
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
+            .withElectedLeader(epoch, electedLeaderId)
+            .withKip853Rpc(true)
+            .build();
+
+        if (hasFetchedFromLeader) {
+            context.pollUntilRequest();
+            RaftRequest.Outbound fetchRequest = 
context.assertSentFetchRequest();
+            context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
+
+            context.deliverResponse(
+                fetchRequest.correlationId(),
+                fetchRequest.destination(),
+                context.fetchResponse(epoch, electedLeaderId, 
MemoryRecords.EMPTY, 0L, Errors.NONE)
+            );
+        }
+
+        // follower should reject pre-vote requests with the same epoch if it 
has successfully fetched from the leader
+        context.deliverRequest(context.preVoteRequest(epoch, otherNodeKey, 
epoch, 1));
+        context.pollUntilResponse();
+
+        boolean voteGranted = !hasFetchedFromLeader;
+        context.assertSentVoteResponse(Errors.NONE, epoch, 
OptionalInt.of(electedLeaderId), voteGranted);
+        context.assertElectedLeader(epoch, electedLeaderId);
+
+        // follower will transition to unattached if pre-vote request has a 
higher epoch
+        context.deliverRequest(context.preVoteRequest(epoch + 1, otherNodeKey, 
epoch + 1, 1));
+        context.pollUntilResponse();
+
+        context.assertSentVoteResponse(Errors.NONE, epoch + 1, 
OptionalInt.of(-1), true);
+        assertEquals(context.currentEpoch(), epoch + 1);
+        assertTrue(context.client.quorum().isUnattachedNotVoted());
+    }
+
+    @Test
+    public void testHandlePreVoteRequestAsFollowerWithVotedCandidate() throws 
Exception {
+        int localId = randomReplicaId();
+        int epoch = 2;
+        ReplicaKey otherNodeKey = replicaKey(localId + 1, true);
+        ReplicaKey votedCandidateKey = replicaKey(localId + 2, true);
+        Set<Integer> voters = Set.of(localId, otherNodeKey.id(), 
votedCandidateKey.id());
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
+            .withVotedCandidate(epoch, votedCandidateKey)
+            .withKip853Rpc(true)
+            .build();
+
+        context.deliverRequest(context.preVoteRequest(epoch, otherNodeKey, 
epoch, 1));
+        context.pollUntilResponse();
+
+        // follower can grant pre-votes if it has not fetched successfully 
from leader yet
+        context.assertSentVoteResponse(Errors.NONE, epoch, 
OptionalInt.empty(), true);
+        context.assertVotedCandidate(epoch, votedCandidateKey.id());
+    }
+
+    @Test
+    public void testHandlePreVoteRequestAsCandidate() throws Exception {
+        int localId = randomReplicaId();
+        ReplicaKey otherNodeKey = replicaKey(localId + 1, true);
+        int leaderEpoch = 2;
+        Set<Integer> voters = Set.of(localId, otherNodeKey.id());
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
+            .withVotedCandidate(leaderEpoch, ReplicaKey.of(localId, 
ReplicaKey.NO_DIRECTORY_ID))
+            .withKip853Rpc(true)
+            .build();
+
+        context.pollUntilRequest();

Review Comment:
   Do you need this? Why are you waiting for a request to be sent by the 
replica? What is this request? Fetch because of your changes in the other PR?



##########
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientPreVoteTest.java:
##########
@@ -0,0 +1,543 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.MemoryRecords;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.Collections;
+import java.util.OptionalInt;
+import java.util.Set;
+
+import static org.apache.kafka.raft.KafkaRaftClientTest.randomReplicaId;
+import static org.apache.kafka.raft.KafkaRaftClientTest.replicaKey;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class KafkaRaftClientPreVoteTest {
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testHandlePreVoteRequestAsFollowerWithElectedLeader(boolean 
hasFetchedFromLeader) throws Exception {
+        int localId = randomReplicaId();
+        int epoch = 2;
+        ReplicaKey otherNodeKey = replicaKey(localId + 1, true);
+        int electedLeaderId = localId + 2;
+        Set<Integer> voters = Set.of(localId, otherNodeKey.id(), 
electedLeaderId);
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
+            .withElectedLeader(epoch, electedLeaderId)
+            .withKip853Rpc(true)
+            .build();
+
+        if (hasFetchedFromLeader) {
+            context.pollUntilRequest();
+            RaftRequest.Outbound fetchRequest = 
context.assertSentFetchRequest();
+            context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
+
+            context.deliverResponse(
+                fetchRequest.correlationId(),
+                fetchRequest.destination(),
+                context.fetchResponse(epoch, electedLeaderId, 
MemoryRecords.EMPTY, 0L, Errors.NONE)
+            );
+        }
+
+        // follower should reject pre-vote requests with the same epoch if it 
has successfully fetched from the leader
+        context.deliverRequest(context.preVoteRequest(epoch, otherNodeKey, 
epoch, 1));
+        context.pollUntilResponse();
+
+        boolean voteGranted = !hasFetchedFromLeader;
+        context.assertSentVoteResponse(Errors.NONE, epoch, 
OptionalInt.of(electedLeaderId), voteGranted);
+        context.assertElectedLeader(epoch, electedLeaderId);
+
+        // follower will transition to unattached if pre-vote request has a 
higher epoch
+        context.deliverRequest(context.preVoteRequest(epoch + 1, otherNodeKey, 
epoch + 1, 1));
+        context.pollUntilResponse();
+
+        context.assertSentVoteResponse(Errors.NONE, epoch + 1, 
OptionalInt.of(-1), true);
+        assertEquals(context.currentEpoch(), epoch + 1);
+        assertTrue(context.client.quorum().isUnattachedNotVoted());
+    }
+
+    @Test
+    public void testHandlePreVoteRequestAsFollowerWithVotedCandidate() throws 
Exception {
+        int localId = randomReplicaId();
+        int epoch = 2;
+        ReplicaKey otherNodeKey = replicaKey(localId + 1, true);
+        ReplicaKey votedCandidateKey = replicaKey(localId + 2, true);
+        Set<Integer> voters = Set.of(localId, otherNodeKey.id(), 
votedCandidateKey.id());
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
+            .withVotedCandidate(epoch, votedCandidateKey)
+            .withKip853Rpc(true)
+            .build();
+
+        context.deliverRequest(context.preVoteRequest(epoch, otherNodeKey, 
epoch, 1));
+        context.pollUntilResponse();
+
+        // follower can grant pre-votes if it has not fetched successfully 
from leader yet
+        context.assertSentVoteResponse(Errors.NONE, epoch, 
OptionalInt.empty(), true);
+        context.assertVotedCandidate(epoch, votedCandidateKey.id());
+    }
+
+    @Test
+    public void testHandlePreVoteRequestAsCandidate() throws Exception {
+        int localId = randomReplicaId();
+        ReplicaKey otherNodeKey = replicaKey(localId + 1, true);
+        int leaderEpoch = 2;
+        Set<Integer> voters = Set.of(localId, otherNodeKey.id());
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
+            .withVotedCandidate(leaderEpoch, ReplicaKey.of(localId, 
ReplicaKey.NO_DIRECTORY_ID))
+            .withKip853Rpc(true)
+            .build();
+
+        context.pollUntilRequest();
+
+        // candidate should grant pre-vote requests with the same epoch if log 
is up-to-date
+        context.deliverRequest(context.preVoteRequest(leaderEpoch, 
otherNodeKey, leaderEpoch, 1));
+        context.pollUntilResponse();
+
+        context.assertSentVoteResponse(Errors.NONE, leaderEpoch, 
OptionalInt.empty(), true);
+        context.assertVotedCandidate(leaderEpoch, localId);
+
+        // candidate will transition to unattached if pre-vote request has a 
higher epoch
+        context.deliverRequest(context.preVoteRequest(leaderEpoch + 1, 
otherNodeKey, leaderEpoch + 1, 1));
+        context.pollUntilResponse();
+
+        context.assertSentVoteResponse(Errors.NONE, leaderEpoch + 1, 
OptionalInt.of(-1), true);
+        assertTrue(context.client.quorum().isUnattached());
+    }
+
+    @Test
+    public void testHandlePreVoteRequestAsUnattachedObserver() throws 
Exception {
+        int localId = randomReplicaId();
+        int epoch = 2;
+        ReplicaKey replica1 = replicaKey(localId + 1, true);
+        ReplicaKey replica2 = replicaKey(localId + 2, true);
+        Set<Integer> voters = Set.of(replica1.id(), replica2.id());
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
+            .withUnknownLeader(epoch)
+            .withKip853Rpc(true)
+            .build();
+
+        context.deliverRequest(context.preVoteRequest(epoch, replica1, epoch, 
1));
+        context.pollUntilResponse();
+
+        assertTrue(context.client.quorum().isUnattached());
+        context.assertSentVoteResponse(Errors.NONE, epoch, 
OptionalInt.empty(), true);
+
+        // if same replica sends another pre-vote request for the same epoch, 
it should be granted
+        context.deliverRequest(context.preVoteRequest(epoch, replica1, epoch, 
1));
+        context.pollUntilResponse();
+
+        assertTrue(context.client.quorum().isUnattached());
+        context.assertSentVoteResponse(Errors.NONE, epoch, 
OptionalInt.empty(), true);
+
+        // if different replica sends a pre-vote request for the same epoch, 
it should be granted
+        context.deliverRequest(context.preVoteRequest(epoch, replica2, epoch, 
1));
+        context.pollUntilResponse();

Review Comment:
   We should also test that a replica outside of the voter set can send a 
pre-vote request and it will be granted accordingly.
   
   This comment applies to a few of the other tests.



##########
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientPreVoteTest.java:
##########
@@ -0,0 +1,543 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.MemoryRecords;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.Collections;
+import java.util.OptionalInt;
+import java.util.Set;
+
+import static org.apache.kafka.raft.KafkaRaftClientTest.randomReplicaId;
+import static org.apache.kafka.raft.KafkaRaftClientTest.replicaKey;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class KafkaRaftClientPreVoteTest {
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testHandlePreVoteRequestAsFollowerWithElectedLeader(boolean 
hasFetchedFromLeader) throws Exception {
+        int localId = randomReplicaId();
+        int epoch = 2;
+        ReplicaKey otherNodeKey = replicaKey(localId + 1, true);
+        int electedLeaderId = localId + 2;
+        Set<Integer> voters = Set.of(localId, otherNodeKey.id(), 
electedLeaderId);
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
+            .withElectedLeader(epoch, electedLeaderId)
+            .withKip853Rpc(true)
+            .build();
+
+        if (hasFetchedFromLeader) {
+            context.pollUntilRequest();
+            RaftRequest.Outbound fetchRequest = 
context.assertSentFetchRequest();
+            context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
+
+            context.deliverResponse(
+                fetchRequest.correlationId(),
+                fetchRequest.destination(),
+                context.fetchResponse(epoch, electedLeaderId, 
MemoryRecords.EMPTY, 0L, Errors.NONE)
+            );
+        }
+
+        // follower should reject pre-vote requests with the same epoch if it 
has successfully fetched from the leader
+        context.deliverRequest(context.preVoteRequest(epoch, otherNodeKey, 
epoch, 1));
+        context.pollUntilResponse();
+
+        boolean voteGranted = !hasFetchedFromLeader;
+        context.assertSentVoteResponse(Errors.NONE, epoch, 
OptionalInt.of(electedLeaderId), voteGranted);
+        context.assertElectedLeader(epoch, electedLeaderId);
+
+        // follower will transition to unattached if pre-vote request has a 
higher epoch
+        context.deliverRequest(context.preVoteRequest(epoch + 1, otherNodeKey, 
epoch + 1, 1));
+        context.pollUntilResponse();
+
+        context.assertSentVoteResponse(Errors.NONE, epoch + 1, 
OptionalInt.of(-1), true);
+        assertEquals(context.currentEpoch(), epoch + 1);
+        assertTrue(context.client.quorum().isUnattachedNotVoted());
+    }
+
+    @Test
+    public void testHandlePreVoteRequestAsFollowerWithVotedCandidate() throws 
Exception {
+        int localId = randomReplicaId();
+        int epoch = 2;
+        ReplicaKey otherNodeKey = replicaKey(localId + 1, true);
+        ReplicaKey votedCandidateKey = replicaKey(localId + 2, true);
+        Set<Integer> voters = Set.of(localId, otherNodeKey.id(), 
votedCandidateKey.id());
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
+            .withVotedCandidate(epoch, votedCandidateKey)
+            .withKip853Rpc(true)
+            .build();
+
+        context.deliverRequest(context.preVoteRequest(epoch, otherNodeKey, 
epoch, 1));
+        context.pollUntilResponse();
+
+        // follower can grant pre-votes if it has not fetched successfully 
from leader yet
+        context.assertSentVoteResponse(Errors.NONE, epoch, 
OptionalInt.empty(), true);
+        context.assertVotedCandidate(epoch, votedCandidateKey.id());
+    }
+
+    @Test
+    public void testHandlePreVoteRequestAsCandidate() throws Exception {
+        int localId = randomReplicaId();
+        ReplicaKey otherNodeKey = replicaKey(localId + 1, true);
+        int leaderEpoch = 2;
+        Set<Integer> voters = Set.of(localId, otherNodeKey.id());
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
+            .withVotedCandidate(leaderEpoch, ReplicaKey.of(localId, 
ReplicaKey.NO_DIRECTORY_ID))
+            .withKip853Rpc(true)
+            .build();
+
+        context.pollUntilRequest();
+
+        // candidate should grant pre-vote requests with the same epoch if log 
is up-to-date
+        context.deliverRequest(context.preVoteRequest(leaderEpoch, 
otherNodeKey, leaderEpoch, 1));
+        context.pollUntilResponse();
+
+        context.assertSentVoteResponse(Errors.NONE, leaderEpoch, 
OptionalInt.empty(), true);
+        context.assertVotedCandidate(leaderEpoch, localId);
+
+        // candidate will transition to unattached if pre-vote request has a 
higher epoch
+        context.deliverRequest(context.preVoteRequest(leaderEpoch + 1, 
otherNodeKey, leaderEpoch + 1, 1));
+        context.pollUntilResponse();
+
+        context.assertSentVoteResponse(Errors.NONE, leaderEpoch + 1, 
OptionalInt.of(-1), true);
+        assertTrue(context.client.quorum().isUnattached());
+    }
+
+    @Test
+    public void testHandlePreVoteRequestAsUnattachedObserver() throws 
Exception {
+        int localId = randomReplicaId();
+        int epoch = 2;
+        ReplicaKey replica1 = replicaKey(localId + 1, true);
+        ReplicaKey replica2 = replicaKey(localId + 2, true);
+        Set<Integer> voters = Set.of(replica1.id(), replica2.id());
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
+            .withUnknownLeader(epoch)
+            .withKip853Rpc(true)
+            .build();
+
+        context.deliverRequest(context.preVoteRequest(epoch, replica1, epoch, 
1));
+        context.pollUntilResponse();
+
+        assertTrue(context.client.quorum().isUnattached());
+        context.assertSentVoteResponse(Errors.NONE, epoch, 
OptionalInt.empty(), true);
+
+        // if same replica sends another pre-vote request for the same epoch, 
it should be granted
+        context.deliverRequest(context.preVoteRequest(epoch, replica1, epoch, 
1));
+        context.pollUntilResponse();
+
+        assertTrue(context.client.quorum().isUnattached());
+        context.assertSentVoteResponse(Errors.NONE, epoch, 
OptionalInt.empty(), true);
+
+        // if different replica sends a pre-vote request for the same epoch, 
it should be granted
+        context.deliverRequest(context.preVoteRequest(epoch, replica2, epoch, 
1));
+        context.pollUntilResponse();
+
+        assertTrue(context.client.quorum().isUnattached());
+        context.assertSentVoteResponse(Errors.NONE, epoch, 
OptionalInt.empty(), true);
+    }
+
+    @Test
+    public void testHandlePreVoteRequestAsUnattachedVoted() throws Exception {
+        int localId = randomReplicaId();
+        int epoch = 2;
+        ReplicaKey replica1 = replicaKey(localId + 1, true);
+        ReplicaKey replica2 = replicaKey(localId + 2, true);
+        Set<Integer> voters = Set.of(replica1.id(), replica2.id());
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
+            .withVotedCandidate(epoch, replica2)
+            .withKip853Rpc(true)
+            .build();
+
+        context.deliverRequest(context.preVoteRequest(epoch, replica1, epoch, 
1));
+        context.pollUntilResponse();
+
+        assertTrue(context.client.quorum().isUnattachedAndVoted());
+        context.assertSentVoteResponse(Errors.NONE, epoch, 
OptionalInt.empty(), true);
+
+        // if same replica sends another pre-vote request for the same epoch, 
it should be granted
+        context.deliverRequest(context.preVoteRequest(epoch, replica1, epoch, 
1));
+        context.pollUntilResponse();
+
+        assertTrue(context.client.quorum().isUnattached());
+        context.assertSentVoteResponse(Errors.NONE, epoch, 
OptionalInt.empty(), true);
+
+        // if different replica sends a pre-vote request for the same epoch, 
it should be granted
+        context.deliverRequest(context.preVoteRequest(epoch, replica2, epoch, 
1));
+        context.pollUntilResponse();
+
+        assertTrue(context.client.quorum().isUnattached());
+        context.assertSentVoteResponse(Errors.NONE, epoch, 
OptionalInt.empty(), true);
+    }
+
+    @Test
+    public void testHandlePreVoteRequestAsUnattachedWithLeader() throws 
Exception {
+        int localId = randomReplicaId();
+        int epoch = 2;
+        ReplicaKey replica1 = replicaKey(localId + 1, true);
+        ReplicaKey replica2 = replicaKey(localId + 2, true);
+        ReplicaKey leader = replicaKey(localId + 3, true);
+        Set<Integer> voters = Set.of(replica1.id(), replica2.id());
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
+            .withElectedLeader(epoch, leader.id())
+            .withKip853Rpc(true)
+            .build();
+
+        context.deliverRequest(context.preVoteRequest(epoch, replica1, epoch, 
1));
+        context.pollUntilResponse();
+
+        assertTrue(context.client.quorum().isUnattachedNotVoted());
+        context.assertSentVoteResponse(Errors.NONE, epoch, 
OptionalInt.of(leader.id()), true);
+
+        // if same replica sends another pre-vote request for the same epoch, 
it should be granted
+        context.deliverRequest(context.preVoteRequest(epoch, replica1, epoch, 
1));
+        context.pollUntilResponse();
+
+        assertTrue(context.client.quorum().isUnattached());
+        context.assertSentVoteResponse(Errors.NONE, epoch, 
OptionalInt.of(leader.id()), true);
+
+        // if different replica sends a pre-vote request for the same epoch, 
it should be granted
+        context.deliverRequest(context.preVoteRequest(epoch, replica2, epoch, 
1));
+        context.pollUntilResponse();
+
+        assertTrue(context.client.quorum().isUnattached());
+        context.assertSentVoteResponse(Errors.NONE, epoch, 
OptionalInt.of(leader.id()), true);
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testHandlePreVoteRequestAsFollowerObserver(boolean 
hasFetchedFromLeader) throws Exception {
+        int localId = randomReplicaId();
+        int epoch = 2;
+        int leaderId = localId + 1;
+        ReplicaKey leader = replicaKey(leaderId, true);
+        ReplicaKey follower = replicaKey(localId + 2, true);
+        Set<Integer> voters = Set.of(leader.id(), follower.id());
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
+            .withElectedLeader(epoch, leader.id())
+            .withKip853Rpc(true)
+            .build();
+        context.assertElectedLeader(epoch, leader.id());
+
+        if (hasFetchedFromLeader) {
+            context.pollUntilRequest();
+            RaftRequest.Outbound fetchRequest = 
context.assertSentFetchRequest();
+            context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
+
+            context.deliverResponse(
+                fetchRequest.correlationId(),
+                fetchRequest.destination(),
+                context.fetchResponse(epoch, leaderId, MemoryRecords.EMPTY, 
0L, Errors.NONE)
+            );
+        }
+
+        context.deliverRequest(context.preVoteRequest(epoch, follower, epoch, 
1));
+        context.pollUntilResponse();
+
+        boolean voteGranted = !hasFetchedFromLeader;
+        context.assertSentVoteResponse(Errors.NONE, epoch, 
OptionalInt.of(leaderId), voteGranted);
+        assertTrue(context.client.quorum().isFollower());
+    }
+
+    @Test
+    public void testHandleInvalidPreVoteRequestWithOlderEpoch() throws 
Exception {
+        int localId = randomReplicaId();
+        int epoch = 2;
+        ReplicaKey otherNodeKey = replicaKey(localId + 1, true);
+        Set<Integer> voters = Set.of(localId, otherNodeKey.id());
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
+            .withUnknownLeader(epoch)
+            .withKip853Rpc(true)
+            .build();
+
+        context.deliverRequest(context.preVoteRequest(epoch - 1, otherNodeKey, 
epoch - 1, 1));
+        context.pollUntilResponse();
+
+        context.assertSentVoteResponse(Errors.FENCED_LEADER_EPOCH, epoch, 
OptionalInt.empty(), false);
+        context.assertUnknownLeader(epoch);
+    }
+
+    @Test
+    public void testHandlePreVoteRequestAsObserver() throws Exception {
+        int localId = randomReplicaId();
+        int epoch = 2;
+        ReplicaKey otherNodeKey = replicaKey(localId + 1, true);
+        int otherNodeId2 = localId + 2;
+        Set<Integer> voters = Set.of(otherNodeKey.id(), otherNodeId2);
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
+            .withUnknownLeader(epoch)
+            .withKip853Rpc(true)
+            .build();
+
+        context.deliverRequest(context.preVoteRequest(epoch, otherNodeKey, 
epoch, 1));
+        context.pollUntilResponse();
+
+        context.assertSentVoteResponse(Errors.NONE, epoch, 
OptionalInt.empty(), true);
+    }
+
+    @Test
+    public void testLeaderIgnorePreVoteRequestOnSameEpoch() throws Exception {

Review Comment:
   Ignore or reject pre-vote requests?



##########
raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java:
##########
@@ -1818,9 +1839,9 @@ private short fetchSnapshotRpcVersion() {
         }
     }
 
-    private short voteRpcVersion() {
+    short voteRpcVersion() {
         if (kip853Rpc) {
-            return 1;
+            return ApiKeys.VOTE.latestVersion();

Review Comment:
   I see. Are you planning to fix/change this in a future PR?



##########
raft/src/test/java/org/apache/kafka/raft/UnattachedStateWithVoteTest.java:
##########
@@ -83,35 +83,67 @@ public void testElectionTimeout() {
     public void testCanGrantVoteWithoutDirectoryId(boolean isLogUpToDate) {
         UnattachedState state = 
newUnattachedVotedState(ReplicaKey.NO_DIRECTORY_ID);
 
-        assertTrue(
-            state.canGrantVote(ReplicaKey.of(votedId, 
ReplicaKey.NO_DIRECTORY_ID), isLogUpToDate)
+        assertEquals(
+            isLogUpToDate,
+            state.canGrantPreVote(ReplicaKey.of(votedId, 
ReplicaKey.NO_DIRECTORY_ID), isLogUpToDate)
         );
-        assertTrue(
-            state.canGrantVote(
-                ReplicaKey.of(votedId, Uuid.randomUuid()),
-                isLogUpToDate
-            )
+        assertTrue(state.canGrantVote(ReplicaKey.of(votedId, 
ReplicaKey.NO_DIRECTORY_ID), isLogUpToDate));
+
+        assertEquals(
+            isLogUpToDate,
+            state.canGrantPreVote(ReplicaKey.of(votedId, Uuid.randomUuid()), 
isLogUpToDate)
         );
+        assertTrue(state.canGrantVote(ReplicaKey.of(votedId, 
Uuid.randomUuid()), isLogUpToDate));
 
-        assertFalse(
-            state.canGrantVote(ReplicaKey.of(votedId + 1, 
ReplicaKey.NO_DIRECTORY_ID), isLogUpToDate)
+        // Can grant PreVote to other replicas even if we have granted a 
standard vote to another replica
+        assertEquals(
+            isLogUpToDate,
+            state.canGrantPreVote(ReplicaKey.of(votedId + 1, 
ReplicaKey.NO_DIRECTORY_ID), isLogUpToDate)
         );
+        assertFalse(state.canGrantVote(ReplicaKey.of(votedId + 1, 
ReplicaKey.NO_DIRECTORY_ID), isLogUpToDate));
     }
 
-    @Test
-    void testCanGrantVoteWithDirectoryId() {
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    void testCanGrantVoteWithDirectoryId(boolean isLogUpToDate) {
         Uuid votedDirectoryId = Uuid.randomUuid();
         UnattachedState state = newUnattachedVotedState(votedDirectoryId);
 
-        assertTrue(state.canGrantVote(ReplicaKey.of(votedId, 
votedDirectoryId), false));
+        // Same voterKey
+        // We will not grant PreVote for a replica we have already granted a 
standard vote to if their log is behind
+        assertEquals(
+            isLogUpToDate,
+            state.canGrantPreVote(ReplicaKey.of(votedId, votedDirectoryId), 
isLogUpToDate)
+        );

Review Comment:
   This comment doesn't match the check.



##########
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientPreVoteTest.java:
##########
@@ -0,0 +1,543 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.MemoryRecords;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.Collections;
+import java.util.OptionalInt;
+import java.util.Set;
+
+import static org.apache.kafka.raft.KafkaRaftClientTest.randomReplicaId;
+import static org.apache.kafka.raft.KafkaRaftClientTest.replicaKey;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class KafkaRaftClientPreVoteTest {

Review Comment:
   This suite doesn't seem to test the case when the remove replica doesn't 
have a log that is update with the local replica.



##########
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientPreVoteTest.java:
##########
@@ -0,0 +1,543 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.MemoryRecords;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.Collections;
+import java.util.OptionalInt;
+import java.util.Set;
+
+import static org.apache.kafka.raft.KafkaRaftClientTest.randomReplicaId;
+import static org.apache.kafka.raft.KafkaRaftClientTest.replicaKey;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class KafkaRaftClientPreVoteTest {
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testHandlePreVoteRequestAsFollowerWithElectedLeader(boolean 
hasFetchedFromLeader) throws Exception {
+        int localId = randomReplicaId();
+        int epoch = 2;
+        ReplicaKey otherNodeKey = replicaKey(localId + 1, true);
+        int electedLeaderId = localId + 2;
+        Set<Integer> voters = Set.of(localId, otherNodeKey.id(), 
electedLeaderId);
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
+            .withElectedLeader(epoch, electedLeaderId)
+            .withKip853Rpc(true)
+            .build();
+
+        if (hasFetchedFromLeader) {
+            context.pollUntilRequest();
+            RaftRequest.Outbound fetchRequest = 
context.assertSentFetchRequest();
+            context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
+
+            context.deliverResponse(
+                fetchRequest.correlationId(),
+                fetchRequest.destination(),
+                context.fetchResponse(epoch, electedLeaderId, 
MemoryRecords.EMPTY, 0L, Errors.NONE)
+            );
+        }
+
+        // follower should reject pre-vote requests with the same epoch if it 
has successfully fetched from the leader
+        context.deliverRequest(context.preVoteRequest(epoch, otherNodeKey, 
epoch, 1));
+        context.pollUntilResponse();
+
+        boolean voteGranted = !hasFetchedFromLeader;
+        context.assertSentVoteResponse(Errors.NONE, epoch, 
OptionalInt.of(electedLeaderId), voteGranted);
+        context.assertElectedLeader(epoch, electedLeaderId);
+
+        // follower will transition to unattached if pre-vote request has a 
higher epoch
+        context.deliverRequest(context.preVoteRequest(epoch + 1, otherNodeKey, 
epoch + 1, 1));
+        context.pollUntilResponse();
+
+        context.assertSentVoteResponse(Errors.NONE, epoch + 1, 
OptionalInt.of(-1), true);
+        assertEquals(context.currentEpoch(), epoch + 1);
+        assertTrue(context.client.quorum().isUnattachedNotVoted());
+    }
+
+    @Test
+    public void testHandlePreVoteRequestAsFollowerWithVotedCandidate() throws 
Exception {

Review Comment:
   What does "AsFollowerWithVotedCandidtate"? Follower and UnattachedVoted are 
mutually exclusive states in the KRaft implementation.



##########
raft/src/test/java/org/apache/kafka/raft/FollowerStateTest.java:
##########
@@ -92,6 +92,19 @@ public void testMonotonicHighWatermark() {
         assertEquals(Optional.of(new LogOffsetMetadata(15L)), 
state.highWatermark());
     }
 
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testPreVoteIfHasNotFetchedFromLeaderYet(boolean isLogUpToDate) 
{

Review Comment:
   What about the case when the follower has successfully fetched from the 
leader?



##########
clients/src/main/resources/common/message/VoteRequest.json:
##########
@@ -35,18 +36,20 @@
           "versions": "0+", "about": "The partition data.", "fields": [
             { "name": "PartitionIndex", "type": "int32", "versions": "0+",
               "about": "The partition index." },
-            { "name": "CandidateEpoch", "type": "int32", "versions": "0+",
-              "about": "The bumped epoch of the candidate sending the 
request."},
-            { "name": "CandidateId", "type": "int32", "versions": "0+", 
"entityType": "brokerId",
-              "about": "The replica id of the voter sending the request."},
-            { "name": "CandidateDirectoryId", "type": "uuid", "versions": 
"1+", "ignorable": true,
-              "about": "The directory id of the voter sending the request." },
+            { "name": "ReplicaEpoch", "type": "int32", "versions": "0+",
+              "about": "The epoch of the voter sending the request"},
+            { "name": "ReplicaId", "type": "int32", "versions": "0+", 
"entityType": "brokerId",
+              "about": "The replica id of the voter sending the request"},
+            { "name": "ReplicaDirectoryId", "type": "uuid", "versions": "1+", 
"ignorable": true,
+              "about": "The directory id of the voter sending the request" },
             { "name": "VoterDirectoryId", "type": "uuid", "versions": "1+", 
"ignorable": true,
-              "about": "The ID of the voter sending the request."},
+              "about": "The directory id of the voter receiving the request"},
             { "name": "LastOffsetEpoch", "type": "int32", "versions": "0+",
               "about": "The epoch of the last record written to the metadata 
log."},
             { "name": "LastOffset", "type": "int64", "versions": "0+",
-              "about": "The offset of the last record written to the metadata 
log."}
+              "about": "The offset of the last record written to the metadata 
log."},

Review Comment:
   Okay. Outside the scope of this PR but this is not correct. The replica 
sends the end offset and the receiving replica interprets this value as the end 
offset.



##########
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientPreVoteTest.java:
##########
@@ -0,0 +1,543 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.MemoryRecords;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.Collections;
+import java.util.OptionalInt;
+import java.util.Set;
+
+import static org.apache.kafka.raft.KafkaRaftClientTest.randomReplicaId;
+import static org.apache.kafka.raft.KafkaRaftClientTest.replicaKey;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class KafkaRaftClientPreVoteTest {
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testHandlePreVoteRequestAsFollowerWithElectedLeader(boolean 
hasFetchedFromLeader) throws Exception {
+        int localId = randomReplicaId();
+        int epoch = 2;
+        ReplicaKey otherNodeKey = replicaKey(localId + 1, true);
+        int electedLeaderId = localId + 2;
+        Set<Integer> voters = Set.of(localId, otherNodeKey.id(), 
electedLeaderId);
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
+            .withElectedLeader(epoch, electedLeaderId)
+            .withKip853Rpc(true)

Review Comment:
   Are you planning to fix this in the next PR? Why not fix it now?



##########
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientPreVoteTest.java:
##########
@@ -0,0 +1,543 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.MemoryRecords;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.Collections;
+import java.util.OptionalInt;
+import java.util.Set;
+
+import static org.apache.kafka.raft.KafkaRaftClientTest.randomReplicaId;
+import static org.apache.kafka.raft.KafkaRaftClientTest.replicaKey;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class KafkaRaftClientPreVoteTest {
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testHandlePreVoteRequestAsFollowerWithElectedLeader(boolean 
hasFetchedFromLeader) throws Exception {
+        int localId = randomReplicaId();
+        int epoch = 2;
+        ReplicaKey otherNodeKey = replicaKey(localId + 1, true);
+        int electedLeaderId = localId + 2;
+        Set<Integer> voters = Set.of(localId, otherNodeKey.id(), 
electedLeaderId);
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
+            .withElectedLeader(epoch, electedLeaderId)
+            .withKip853Rpc(true)
+            .build();
+
+        if (hasFetchedFromLeader) {
+            context.pollUntilRequest();
+            RaftRequest.Outbound fetchRequest = 
context.assertSentFetchRequest();
+            context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
+
+            context.deliverResponse(
+                fetchRequest.correlationId(),
+                fetchRequest.destination(),
+                context.fetchResponse(epoch, electedLeaderId, 
MemoryRecords.EMPTY, 0L, Errors.NONE)
+            );
+        }
+
+        // follower should reject pre-vote requests with the same epoch if it 
has successfully fetched from the leader
+        context.deliverRequest(context.preVoteRequest(epoch, otherNodeKey, 
epoch, 1));
+        context.pollUntilResponse();
+
+        boolean voteGranted = !hasFetchedFromLeader;
+        context.assertSentVoteResponse(Errors.NONE, epoch, 
OptionalInt.of(electedLeaderId), voteGranted);
+        context.assertElectedLeader(epoch, electedLeaderId);
+
+        // follower will transition to unattached if pre-vote request has a 
higher epoch
+        context.deliverRequest(context.preVoteRequest(epoch + 1, otherNodeKey, 
epoch + 1, 1));
+        context.pollUntilResponse();
+
+        context.assertSentVoteResponse(Errors.NONE, epoch + 1, 
OptionalInt.of(-1), true);
+        assertEquals(context.currentEpoch(), epoch + 1);
+        assertTrue(context.client.quorum().isUnattachedNotVoted());
+    }
+
+    @Test
+    public void testHandlePreVoteRequestAsFollowerWithVotedCandidate() throws 
Exception {
+        int localId = randomReplicaId();
+        int epoch = 2;
+        ReplicaKey otherNodeKey = replicaKey(localId + 1, true);
+        ReplicaKey votedCandidateKey = replicaKey(localId + 2, true);
+        Set<Integer> voters = Set.of(localId, otherNodeKey.id(), 
votedCandidateKey.id());
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
+            .withVotedCandidate(epoch, votedCandidateKey)
+            .withKip853Rpc(true)
+            .build();
+
+        context.deliverRequest(context.preVoteRequest(epoch, otherNodeKey, 
epoch, 1));
+        context.pollUntilResponse();
+
+        // follower can grant pre-votes if it has not fetched successfully 
from leader yet
+        context.assertSentVoteResponse(Errors.NONE, epoch, 
OptionalInt.empty(), true);
+        context.assertVotedCandidate(epoch, votedCandidateKey.id());
+    }
+
+    @Test
+    public void testHandlePreVoteRequestAsCandidate() throws Exception {
+        int localId = randomReplicaId();
+        ReplicaKey otherNodeKey = replicaKey(localId + 1, true);
+        int leaderEpoch = 2;
+        Set<Integer> voters = Set.of(localId, otherNodeKey.id());
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
+            .withVotedCandidate(leaderEpoch, ReplicaKey.of(localId, 
ReplicaKey.NO_DIRECTORY_ID))
+            .withKip853Rpc(true)
+            .build();
+
+        context.pollUntilRequest();
+
+        // candidate should grant pre-vote requests with the same epoch if log 
is up-to-date
+        context.deliverRequest(context.preVoteRequest(leaderEpoch, 
otherNodeKey, leaderEpoch, 1));
+        context.pollUntilResponse();
+
+        context.assertSentVoteResponse(Errors.NONE, leaderEpoch, 
OptionalInt.empty(), true);
+        context.assertVotedCandidate(leaderEpoch, localId);

Review Comment:
   Do you also want to to assert that the replica is in the Candidate state?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to