jsancio commented on code in PR #18240: URL: https://github.com/apache/kafka/pull/18240#discussion_r1917177912
########## raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientPreVoteTest.java: ########## @@ -573,4 +700,450 @@ public void testRejectPreVoteIfRemoteLogIsNotUpToDate() throws Exception { assertTrue(context.client.quorum().isUnattached()); context.assertSentVoteResponse(Errors.NONE, epoch, OptionalInt.empty(), false); } + + @ParameterizedTest + @EnumSource(value = KRaftVersion.class) + public void testPreVoteResponseIgnoredAfterBecomingFollower(KRaftVersion kraftVersion) throws Exception { + int localId = randomReplicaId(); + ReplicaKey local = replicaKey(localId, true); + ReplicaKey voter2 = replicaKey(localId + 1, true); + ReplicaKey voter3 = replicaKey(localId + 2, true); + int epoch = 5; + + RaftClientTestContext context = new RaftClientTestContext.Builder( + local, + VoterSetTest.voterSet(Stream.of(local, voter2, voter3)), + kraftVersion + ) + .withUnknownLeader(epoch) + .withRaftProtocol(KIP_996_PROTOCOL) + .build(); + + context.assertUnknownLeaderAndNoVotedCandidate(epoch); + + // Sleep a little to ensure transition to prospective + context.time.sleep(context.electionTimeoutMs() * 2L); + + // Wait until the vote requests are inflight + context.pollUntilRequest(); + assertTrue(context.client.quorum().isProspective()); + List<RaftRequest.Outbound> voteRequests = context.collectVoteRequests(epoch, 0, 0); + assertEquals(2, voteRequests.size()); + + // While the vote requests are still inflight, replica receives a BeginEpoch for the same epoch + context.deliverRequest(context.beginEpochRequest(epoch, voter3.id())); + context.client.poll(); + context.assertElectedLeader(epoch, voter3.id()); + + // If PreVote responses are received now they should be ignored + VoteResponseData voteResponse1 = context.voteResponse(true, OptionalInt.empty(), epoch); + context.deliverResponse( + voteRequests.get(0).correlationId(), + voteRequests.get(0).destination(), + voteResponse1 + ); + + VoteResponseData voteResponse2 = context.voteResponse(true, OptionalInt.of(voter3.id()), epoch); + context.deliverResponse( + voteRequests.get(1).correlationId(), + voteRequests.get(1).destination(), + voteResponse2 + ); + + context.client.poll(); + context.assertElectedLeader(epoch, voter3.id()); + } + + @ParameterizedTest + @EnumSource(value = KRaftVersion.class) + public void testPreVoteNotSupportedByRemote(KRaftVersion kraftVersion) throws Exception { + int localId = randomReplicaId(); + ReplicaKey local = replicaKey(localId, true); + ReplicaKey voter2Key = replicaKey(localId + 1, true); + ReplicaKey voter3Key = replicaKey(localId + 2, true); + int epoch = 5; + + RaftClientTestContext context = new RaftClientTestContext.Builder( + local, + VoterSetTest.voterSet(Stream.of(local, voter2Key, voter3Key)), + kraftVersion + ) + .withUnknownLeader(epoch) + .withRaftProtocol(KIP_996_PROTOCOL) + .build(); + + context.assertUnknownLeaderAndNoVotedCandidate(epoch); + + // Sleep a little to ensure transition to Prospective + context.time.sleep(context.electionTimeoutMs() * 2L); + context.pollUntilRequest(); + assertEquals(epoch, context.currentEpoch()); + assertTrue(context.client.quorum().isProspective()); + + // Simulate one remote node not supporting PreVote with UNSUPPORTED_VERSION response. + // Note: with the mocked network client we simulate this is a bit differently, in reality this response would + // be generated from the network client and not sent from the remote node. + List<RaftRequest.Outbound> voteRequests = context.collectPreVoteRequests(epoch, 0, 0); + assertEquals(2, voteRequests.size()); + context.deliverResponse( + voteRequests.get(0).correlationId(), + voteRequests.get(0).destination(), + RaftUtil.errorResponse(ApiKeys.VOTE, Errors.UNSUPPORTED_VERSION) + ); + + // Local should transition to Candidate since it realizes remote node does not support PreVote. + context.client.poll(); + assertEquals(epoch + 1, context.currentEpoch()); + context.client.quorum().isCandidate(); + + // Any further PreVote requests should be ignored + context.deliverResponse( + voteRequests.get(1).correlationId(), + voteRequests.get(1).destination(), + context.voteResponse(true, OptionalInt.empty(), epoch) + ); + context.client.poll(); + assertEquals(epoch + 1, context.currentEpoch()); + context.client.quorum().isCandidate(); + context.collectVoteRequests(epoch + 1, 0, 0); + + // Sleep to transition back to Prospective + context.time.sleep(context.client.quorum().candidateStateOrThrow().remainingElectionTimeMs(context.time.milliseconds())); + context.client.poll(); + assertEquals(epoch + 1, context.currentEpoch()); + assertTrue(context.client.quorum().isProspective()); + + // Simulate receiving enough valid PreVote responses for election to succeed + context.pollUntilRequest(); + voteRequests = context.collectPreVoteRequests(epoch + 1, 0, 0); + assertEquals(2, voteRequests.size()); + context.deliverResponse( + voteRequests.get(0).correlationId(), + voteRequests.get(0).destination(), + context.voteResponse(true, OptionalInt.empty(), epoch + 1) + ); + context.client.poll(); + assertEquals(epoch + 2, context.currentEpoch()); + context.client.quorum().isCandidate(); + + // Any further PreVote responses should be ignored + context.deliverResponse( + voteRequests.get(1).correlationId(), + voteRequests.get(1).destination(), + RaftUtil.errorResponse(ApiKeys.VOTE, Errors.UNSUPPORTED_VERSION) + ); + context.client.poll(); + assertEquals(epoch + 2, context.currentEpoch()); + context.client.quorum().isCandidate(); + } + + @ParameterizedTest + @MethodSource("kraftVersionRaftProtocolCombinations") + public void testProspectiveReceivesBeginQuorumRequest( + KRaftVersion kraftVersion, + RaftProtocol raftProtocol + ) throws Exception { + int localId = randomReplicaId(); + ReplicaKey local = replicaKey(localId, true); + ReplicaKey leader = replicaKey(localId + 1, true); + int epoch = 5; + + RaftClientTestContext context = new RaftClientTestContext.Builder( + local, + VoterSetTest.voterSet(Stream.of(local, leader)), + kraftVersion + ) + .withUnknownLeader(epoch) + .withRaftProtocol(raftProtocol) + .build(); + + context.assertUnknownLeaderAndNoVotedCandidate(epoch); + + // Sleep a little to ensure transition to prospective + context.time.sleep(context.electionTimeoutMs() * 2L); + context.pollUntilRequest(); + + assertTrue(context.client.quorum().isProspective()); + + context.deliverRequest(context.beginEpochRequest(epoch, leader.id())); + context.client.poll(); + + assertTrue(context.client.quorum().isFollower()); + context.assertElectedLeader(epoch, leader.id()); + } + + @ParameterizedTest + @MethodSource("kraftVersionRaftProtocolCombinations") + public void testProspectiveTransitionsToUnattachedOnElectionFailure( + KRaftVersion kraftVersion, + RaftProtocol raftProtocol + ) throws Exception { + int localId = randomReplicaId(); + ReplicaKey local = replicaKey(localId, true); + ReplicaKey otherNode = replicaKey(localId + 1, true); + int epoch = 5; + + RaftClientTestContext context = new RaftClientTestContext.Builder( + local, + VoterSetTest.voterSet(Stream.of(local, otherNode)), + kraftVersion + ) + .withUnknownLeader(epoch) + .withRaftProtocol(raftProtocol) + .build(); + context.assertUnknownLeaderAndNoVotedCandidate(epoch); + + // Sleep a little to ensure that transition to prospective + context.time.sleep(context.electionTimeoutMs() * 2L); + context.pollUntilRequest(); + assertTrue(context.client.quorum().isProspective()); + context.assertSentVoteRequest(epoch, 0, 0L, 1); + + // If election timeout expires, replica should transition to unattached to attempt re-discovering leader + context.time.sleep(context.electionTimeoutMs() * 2L); + context.client.poll(); + assertTrue(context.client.quorum().isUnattached()); + + // After election times out again, replica will transition back to prospective and send PreVote requests + context.time.sleep(context.electionTimeoutMs() * 2L); + context.pollUntilRequest(); + RaftRequest.Outbound voteRequest = context.assertSentVoteRequest(epoch, 0, 0L, 1); + + // If prospective receives enough rejected votes, it also transitions to unattached immediately + context.deliverResponse( + voteRequest.correlationId(), + voteRequest.destination(), + context.voteResponse(false, OptionalInt.empty(), epoch)); + context.client.poll(); + assertTrue(context.client.quorum().isUnattached()); + + // After election times out again, replica will transition back to prospective and send PreVote requests + context.time.sleep(context.electionTimeoutMs() * 2L); + context.pollUntilRequest(); + context.assertSentVoteRequest(epoch, 0, 0L, 1); + } + + @ParameterizedTest + @MethodSource("kraftVersionRaftProtocolCombinations") + public void testProspectiveWithLeaderTransitionsToFollower( + KRaftVersion kraftVersion, + RaftProtocol raftProtocol + ) throws Exception { + int localId = randomReplicaId(); + ReplicaKey local = replicaKey(localId, true); + ReplicaKey replica1 = replicaKey(localId + 1, true); + ReplicaKey replica2 = replicaKey(localId + 2, true); + int epoch = 5; + + RaftClientTestContext context = new RaftClientTestContext.Builder( + local, + VoterSetTest.voterSet(Stream.of(local, replica1, replica2)), + kraftVersion + ) + .withElectedLeader(epoch, replica1.id()) + .withRaftProtocol(raftProtocol) + .build(); + context.assertElectedLeader(epoch, replica1.id()); + assertTrue(context.client.quorum().isFollower()); + + // Sleep a little to ensure transition to prospective + context.time.sleep(context.fetchTimeoutMs); + context.pollUntilRequest(); + assertTrue(context.client.quorum().isProspective()); + context.assertSentVoteRequest(epoch, 0, 0L, 2); + + // If election timeout expires, replica should transition back to follower if it hasn't found new leader yet + context.time.sleep(context.electionTimeoutMs() * 2L); + context.pollUntilRequest(); + context.assertSentFetchRequest(); + assertTrue(context.client.quorum().isFollower()); + context.assertElectedLeader(epoch, replica1.id()); + + // After election times out again, replica will transition back to prospective and send PreVote requests + context.time.sleep(context.fetchTimeoutMs); + context.pollUntilRequest(); + List<RaftRequest.Outbound> voteRequests = context.collectVoteRequests(epoch, 0, 0); + assertEquals(2, voteRequests.size()); + assertTrue(context.client.quorum().isProspective()); + context.assertElectedLeader(epoch, replica1.id()); + + // If prospective receives enough rejected votes without leaderId, it also transitions to follower immediately + context.deliverResponse( + voteRequests.get(0).correlationId(), + voteRequests.get(0).destination(), + context.voteResponse(false, OptionalInt.empty(), epoch)); + context.deliverResponse( + voteRequests.get(1).correlationId(), + voteRequests.get(1).destination(), + context.voteResponse(false, OptionalInt.empty(), epoch)); + // handle vote response and mark rejected vote + context.client.poll(); + // transition to follower after seeing election has failed + context.pollUntilRequest(); + assertTrue(context.client.quorum().isFollower()); + context.assertSentFetchRequest(); + + // After election times out again, transition back to prospective and send PreVote requests + context.time.sleep(context.fetchTimeoutMs); + context.pollUntilRequest(); + voteRequests = context.collectVoteRequests(epoch, 0, 0); + assertEquals(2, voteRequests.size()); + assertTrue(context.client.quorum().isProspective()); + context.assertElectedLeader(epoch, replica1.id()); + + // If prospective receives vote response with different leaderId, it will transition to follower immediately + context.deliverResponse( + voteRequests.get(0).correlationId(), + voteRequests.get(0).destination(), + context.voteResponse(Errors.FENCED_LEADER_EPOCH, OptionalInt.of(replica2.id()), epoch + 1)); + context.client.poll(); + assertTrue(context.client.quorum().isFollower()); + context.assertElectedLeader(epoch + 1, replica2.id()); + } + + @ParameterizedTest + @EnumSource(value = KRaftVersion.class) + public void testProspectiveLosesElectionHasLeaderButMissingEndpoint(KRaftVersion kraftVersion) throws Exception { + int localId = randomReplicaId(); + ReplicaKey local = replicaKey(localId, true); + ReplicaKey voter1 = replicaKey(localId + 1, true); + int electedLeaderId = localId + 3; + int epoch = 2; + + RaftClientTestContext context = new RaftClientTestContext.Builder( + local, + VoterSetTest.voterSet(Stream.of(local, voter1)), + kraftVersion + ) + .withElectedLeader(epoch, electedLeaderId) + .withRaftProtocol(KIP_996_PROTOCOL) + .build(); + context.assertElectedLeader(epoch, electedLeaderId); + assertTrue(context.client.quorum().isUnattached()); + // Sleep a little to ensure that we become a prospective + context.time.sleep(context.electionTimeoutMs() * 2L); + context.client.poll(); + assertTrue(context.client.quorum().isProspective()); + + // Sleep past election timeout + context.time.sleep(context.electionTimeoutMs() * 2L); + context.client.poll(); + + // Prospective should transition to unattached + assertTrue(context.client.quorum().isUnattached()); + assertTrue(context.client.quorum().hasLeader()); + + // If election timeout expires again, it should transition back to prospective + context.time.sleep(context.electionTimeoutMs() * 2L); + context.client.poll(); + assertTrue(context.client.quorum().isProspective()); + assertTrue(context.client.quorum().hasLeader()); + } + + @ParameterizedTest + @MethodSource("kraftVersionRaftProtocolCombinations") + public void testProspectiveWithoutLeaderTransitionsToFollower( + KRaftVersion kraftVersion, + RaftProtocol raftProtocol + ) throws Exception { + ReplicaKey local = replicaKey(randomReplicaId(), true); + ReplicaKey leader = replicaKey(local.id() + 1, true); + ReplicaKey follower = replicaKey(local.id() + 2, true); + int epoch = 5; + + RaftClientTestContext context = new RaftClientTestContext.Builder( + local, + VoterSetTest.voterSet(Stream.of(local, leader, follower)), + kraftVersion + ) + .withUnknownLeader(epoch) + .withRaftProtocol(raftProtocol) + .build(); + context.assertUnknownLeaderAndNoVotedCandidate(epoch); + + // Sleep a little to ensure that we transition to Prospective + context.time.sleep(context.electionTimeoutMs() * 2L); + context.pollUntilRequest(); + assertTrue(context.client.quorum().isProspective()); + List<RaftRequest.Outbound> voteRequests = context.collectVoteRequests(epoch, 0, 0); + assertEquals(2, voteRequests.size()); + + // Simulate PreVote response with granted=true and a leaderId + VoteResponseData voteResponse1 = context.voteResponse(true, OptionalInt.of(leader.id()), epoch); + context.deliverResponse( + voteRequests.get(0).correlationId(), + voteRequests.get(0).destination(), + voteResponse1 + ); + + // Prospective should transition to Follower + context.client.poll(); + assertTrue(context.client.quorum().isFollower()); + assertEquals(leader.id(), context.client.quorum().leaderId().orElse(-1)); Review Comment: The most informative comparison in case of a failures is: ```java assertEquals(OptionalInt.of(leader.id()), context.client.quorum().leaderId()); ``` ########## raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java: ########## @@ -323,9 +324,21 @@ public void testInitializeAsResignedAndBecomeCandidate(boolean withKip853Rpc) th context.time.sleep(context.electionTimeoutMs()); context.client.poll(); - // Become candidate in a new epoch - assertTrue(context.client.quorum().isCandidate()); - context.assertVotedCandidate(epoch + 1, localId); + // Become unattached with expired election timeout + assertTrue(context.client.quorum().isUnattached()); + assertEquals(epoch + 1, context.currentEpoch()); + + // Become prospective + context.time.sleep(1); Review Comment: Does it need to sleep for 1 ms? Why? Is calling `poll` enough? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org