ahuang98 commented on code in PR #19416: URL: https://github.com/apache/kafka/pull/19416#discussion_r2052997970
########## raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java: ########## @@ -2130,6 +2136,444 @@ void testFollowerSendsUpdateVoter() throws Exception { context.assertFetchRequestData(fetchRequest, epoch, 0L, 0); } + @ParameterizedTest + @EnumSource(value = Errors.class, names = {"NONE", "UNSUPPORTED_VERSION"}) + void testFollowerSendsUpdateVoterWithKraftVersion0(Errors updateVoterError) throws Exception { + ReplicaKey local = replicaKey(randomReplicaId(), true); + ReplicaKey voter1 = replicaKey(local.id() + 1, true); Review Comment: should we have a version of this test where the voters start out w/o directory ids? ########## raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java: ########## @@ -2085,8 +2091,8 @@ void testFollowerSendsUpdateVoter() throws Exception { .withLocalListeners(localListeners) .build(); - // waiting for 3 times the fetch timeout sends an update voter - for (int i = 0; i < 3; i++) { + // waiting for FETCH request until the UpdateRaftVoter request is set Review Comment: sent? ########## raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java: ########## @@ -609,6 +708,161 @@ public void testBeginQuorumEpochTimer(boolean withDirectoryId) { assertEquals(0, state.timeUntilBeginQuorumEpochTimerExpires(time.milliseconds())); } + @Test + public void testVolatileVoters() { + int follower1 = 1; + long epochStartOffset = 10L; + + VoterSet voters = localWithRemoteVoterSet(IntStream.of(follower1), false); + LeaderState<?> state = newLeaderState( + voters, + epochStartOffset, + KRaftVersion.KRAFT_VERSION_0 + ); + + var votersWithLeaderUpdated = state.volatileVoters().get(); + assertEquals( + voters.updateVoterIgnoringDirectoryId(localVoterNode).get(), + votersWithLeaderUpdated.voters() + ); + + var updatedVoters = new KRaftVersionUpgrade.Voters( + votersWithLeaderUpdated + .voters() + .updateVoterIgnoringDirectoryId(VoterSetTest.voterNode(follower1, true)) + .get() + ); + + // Upate in-memory voter and check state + assertTrue( + state.compareAndSetVolatileVoters(votersWithLeaderUpdated, updatedVoters) + ); + assertEquals(updatedVoters, state.volatileVoters().get()); + + // Unable to perform atomic update + assertFalse( + state.compareAndSetVolatileVoters(votersWithLeaderUpdated, updatedVoters) + ); + } + + @Test + public void testInvalidMaybeAppendUpgradedKRaftVersion() { + int follower1 = 1; + int follower2 = 2; + long epochStartOffset = 10L; + + VoterSet persistedVoters = localWithRemoteVoterSet(IntStream.of(follower1, follower2), false); + LeaderState<?> state = newLeaderState( + persistedVoters, + epochStartOffset, + KRaftVersion.KRAFT_VERSION_0 + ); + + assertThrows( Review Comment: nit, could you add short comments that explain why each is invalid? ########## raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java: ########## @@ -2085,8 +2091,8 @@ void testFollowerSendsUpdateVoter() throws Exception { .withLocalListeners(localListeners) .build(); - // waiting for 3 times the fetch timeout sends an update voter - for (int i = 0; i < 3; i++) { + // waiting for FETCH request until the UpdateRaftVoter request is set + for (int i = 0; i < NUMBER_FETCH_TIMEOUTS_IN_UPDATE_PERIOD; i++) { Review Comment: is there any point to this for loop if `NUMBER_FETCH_TIMEOUTS_IN_UPDATE_PERIOD` is statically set to 1? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org