kevin-wu24 commented on code in PR #18987: URL: https://github.com/apache/kafka/pull/18987#discussion_r2006031583
########## raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java: ########## @@ -475,6 +563,99 @@ void canRecoverFromSingleNodeCommittedDataLoss( scheduler.runUntil(() -> cluster.allReachedHighWatermark(highWatermarkBeforeRestart + 10)); } + @Property(tries = 100, afterFailure = AfterFailureMode.SAMPLE_ONLY) + void canAddObserverAndRemovePartitionedVoter( + @ForAll int seed, + @ForAll @IntRange(min = 3, max = 5) int numVoters + ) { + Random random = new Random(seed); + Cluster cluster = new Cluster(numVoters, 1, random, true); + MessageRouter router = new MessageRouter(cluster); + EventScheduler scheduler = schedulerWithDefaultInvariants(cluster); + Set<Integer> expectedVoterIds = new HashSet<>(cluster.initialVoters.keySet()); + + initializeClusterAndStartAppending(cluster, router, scheduler, 10); + + // Partition a random voter. Add first observer as new voter + int voterIdToRemove = cluster.running.get(random.nextInt(numVoters)).id(); + router.filter(voterIdToRemove, new DropAllTraffic()); + expectedVoterIds.add(numVoters); + addVoter(cluster, scheduler, numVoters, expectedVoterIds); + + // Remove the partitioned voter + expectedVoterIds.remove(voterIdToRemove); + removeVoter(cluster, scheduler, voterIdToRemove, expectedVoterIds); + long highWatermark = cluster.maxHighWatermarkReached(); + scheduler.runUntil(() -> cluster.allReachedHighWatermark(highWatermark + 10, expectedVoterIds)); + } + + @Property(tries = 100, afterFailure = AfterFailureMode.SAMPLE_ONLY) + void canRemoveAndAddBackVoter( + @ForAll int seed, + @ForAll @IntRange(min = 3, max = 5) int numVoters, + @ForAll @IntRange(min = 0, max = 2) int numObservers + ) { + Random random = new Random(seed); + Cluster cluster = new Cluster(numVoters, numObservers, random, true); + MessageRouter router = new MessageRouter(cluster); + EventScheduler scheduler = schedulerWithDefaultInvariants(cluster); + Set<Integer> expectedVoterIds = new HashSet<>(cluster.initialVoters.keySet()); + + initializeClusterAndStartAppending(cluster, router, scheduler, 10); + + // Partition a random voter and remove it + int voterId = cluster.running.get(random.nextInt(numVoters)).id(); + router.filter(voterId, new DropAllTraffic()); + expectedVoterIds.remove(voterId); + removeVoter(cluster, scheduler, voterId, expectedVoterIds); + + // Restore the network and add the voter back + router.filter(voterId, new PermitAllTraffic()); + expectedVoterIds.add(voterId); + addVoter(cluster, scheduler, voterId, expectedVoterIds); + + long highWatermark = cluster.maxHighWatermarkReached(); + scheduler.runUntil(() -> cluster.allReachedHighWatermark(highWatermark + 10)); + } + + /** + * This method initializes the cluster and starts appending to the log. + * @param appendActionPeriodMs - the period at which the append action should be scheduled, in milliseconds + * This value is important for tests that add voters, since if the cluster is appending + * records too quickly, it may prevent the new voter from catching up to the leader's LEO + * so that it can be added to the voter set. + */ + private void initializeClusterAndStartAppending(Cluster cluster, MessageRouter router, EventScheduler scheduler, int appendActionPeriodMs) { + cluster.startAll(); + schedulePolling(scheduler, cluster, 3, 5); + scheduler.schedule(router::deliverAll, 0, 2, 5); + scheduler.schedule(new SequentialAppendAction(cluster), 0, appendActionPeriodMs, 3); + scheduler.runUntil(cluster::hasConsistentLeader); + scheduler.runUntil(() -> cluster.anyReachedHighWatermark(10)); + } + + private void addVoter(Cluster cluster, EventScheduler scheduler, int idToAdd, Set<Integer> expectedVoterIds) { + scheduler.scheduleUntil( + new AddVoterAction(cluster, cluster.running.get(idToAdd)), + () -> cluster.leaderHasCommittedVoterSet(expectedVoterIds), + 0, + 5, + 3 + ); Review Comment: I remember running into a lot of issues when trying to use `runUntil` directly. Mainly because we need to do two things: 1. repeatedly schedule the operation until an RPC completes successfully 2. stop scheduling that operation It's not sufficient to use the existing `EventScheduler#schedule`, because that schedules the RPC from that point thereafter. What happens if I want to test a case like removing and adding back the same voter? Maybe the naming `scheduleUntil` is not clear, but it means that we schedule the event until some condition is met, then we stop scheduling that event. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org