jsancio commented on code in PR #18987:
URL: https://github.com/apache/kafka/pull/18987#discussion_r2004145500


##########
raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java:
##########
@@ -475,6 +563,99 @@ void canRecoverFromSingleNodeCommittedDataLoss(
         scheduler.runUntil(() -> 
cluster.allReachedHighWatermark(highWatermarkBeforeRestart + 10));
     }
 
+    @Property(tries = 100, afterFailure = AfterFailureMode.SAMPLE_ONLY)
+    void canAddObserverAndRemovePartitionedVoter(
+        @ForAll int seed,
+        @ForAll @IntRange(min = 3, max = 5) int numVoters
+    ) {
+        Random random = new Random(seed);
+        Cluster cluster = new Cluster(numVoters, 1, random, true);
+        MessageRouter router = new MessageRouter(cluster);
+        EventScheduler scheduler = schedulerWithDefaultInvariants(cluster);
+        Set<Integer> expectedVoterIds = new 
HashSet<>(cluster.initialVoters.keySet());
+
+        initializeClusterAndStartAppending(cluster, router, scheduler, 10);
+
+        // Partition a random voter. Add first observer as new voter
+        int voterIdToRemove = 
cluster.running.get(random.nextInt(numVoters)).id();
+        router.filter(voterIdToRemove, new DropAllTraffic());
+        expectedVoterIds.add(numVoters);
+        addVoter(cluster, scheduler, numVoters, expectedVoterIds);
+
+        // Remove the partitioned voter
+        expectedVoterIds.remove(voterIdToRemove);
+        removeVoter(cluster, scheduler, voterIdToRemove, expectedVoterIds);
+        long highWatermark = cluster.maxHighWatermarkReached();
+        scheduler.runUntil(() -> cluster.allReachedHighWatermark(highWatermark 
+ 10, expectedVoterIds));
+    }
+
+    @Property(tries = 100, afterFailure = AfterFailureMode.SAMPLE_ONLY)
+    void canRemoveAndAddBackVoter(
+        @ForAll int seed,
+        @ForAll @IntRange(min = 3, max = 5) int numVoters,
+        @ForAll @IntRange(min = 0, max = 2) int numObservers
+    ) {
+        Random random = new Random(seed);
+        Cluster cluster = new Cluster(numVoters, numObservers, random, true);
+        MessageRouter router = new MessageRouter(cluster);
+        EventScheduler scheduler = schedulerWithDefaultInvariants(cluster);
+        Set<Integer> expectedVoterIds = new 
HashSet<>(cluster.initialVoters.keySet());
+
+        initializeClusterAndStartAppending(cluster, router, scheduler, 10);
+
+        // Partition a random voter and remove it
+        int voterId = cluster.running.get(random.nextInt(numVoters)).id();
+        router.filter(voterId, new DropAllTraffic());
+        expectedVoterIds.remove(voterId);
+        removeVoter(cluster, scheduler, voterId, expectedVoterIds);
+
+        // Restore the network and add the voter back
+        router.filter(voterId, new PermitAllTraffic());
+        expectedVoterIds.add(voterId);
+        addVoter(cluster, scheduler, voterId, expectedVoterIds);
+
+        long highWatermark = cluster.maxHighWatermarkReached();
+        scheduler.runUntil(() -> cluster.allReachedHighWatermark(highWatermark 
+ 10));
+    }
+
+    /**
+     * This method initializes the cluster and starts appending to the log.
+     * @param appendActionPeriodMs - the period at which the append action 
should be scheduled, in milliseconds
+     *                             This value is important for tests that add 
voters, since if the cluster is appending
+     *                             records too quickly, it may prevent the new 
voter from catching up to the leader's LEO
+     *                             so that it can be added to the voter set.
+     */
+    private void initializeClusterAndStartAppending(Cluster cluster, 
MessageRouter router, EventScheduler scheduler, int appendActionPeriodMs) {
+        cluster.startAll();
+        schedulePolling(scheduler, cluster, 3, 5);
+        scheduler.schedule(router::deliverAll, 0, 2, 5);
+        scheduler.schedule(new SequentialAppendAction(cluster), 0, 
appendActionPeriodMs, 3);
+        scheduler.runUntil(cluster::hasConsistentLeader);
+        scheduler.runUntil(() -> cluster.anyReachedHighWatermark(10));
+    }
+
+    private void addVoter(Cluster cluster, EventScheduler scheduler, int 
idToAdd, Set<Integer> expectedVoterIds) {
+        scheduler.scheduleUntil(
+            new AddVoterAction(cluster, cluster.running.get(idToAdd)),
+            () -> cluster.leaderHasCommittedVoterSet(expectedVoterIds),
+            0,
+            5,
+            3
+        );

Review Comment:
   Do you need `EventScheduler::scheduleUntil`? The scheduler is not a 
wall-clock or concurrent scheduler. It uses a mocked implementation of time. 
For example, did you consider performing the add voter action in the `runUntil` 
below?
   
   You can run until the add voter RPC has been completed. If you look at 
`RaftMessage.Inbound`, there is a future that will be compleded when the 
`RaftMessage` has been handled.



##########
raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java:
##########
@@ -645,39 +929,52 @@ private static class Cluster {
         final AtomicInteger correlationIdCounter = new AtomicInteger();
         final MockTime time = new MockTime();
         final String clusterId = Uuid.randomUuid().toString();
-        final Map<Integer, Node> voters = new HashMap<>();
-        final Map<Integer, PersistentState> nodes = new HashMap<>();
+        final Map<Integer, Node> initialVoters = new HashMap<>();
+        final Map<Integer, PersistentState> persistentStates = new HashMap<>();
         final Map<Integer, RaftNode> running = new HashMap<>();
+        final boolean withKip853;
 
-        private Cluster(int numVoters, int numObservers, Random random) {
+        private Cluster(int numVoters, int numObservers, Random random, 
boolean withKip853) {
             this.random = random;
+            this.withKip853 = withKip853;
 
             for (int nodeId = 0; nodeId < numVoters; nodeId++) {
-                voters.put(nodeId, nodeFromId(nodeId));
-                nodes.put(nodeId, new PersistentState(nodeId));
+                initialVoters.put(nodeId, nodeFromId(nodeId));
+                persistentStates.put(nodeId, new PersistentState(nodeId));
             }
 
             for (int nodeIdDelta = 0; nodeIdDelta < numObservers; 
nodeIdDelta++) {
                 int nodeId = numVoters + nodeIdDelta;
-                nodes.put(nodeId, new PersistentState(nodeId));
+                persistentStates.put(nodeId, new PersistentState(nodeId));
             }
         }
 
         Set<InetSocketAddress> endpointsFromIds(Set<Integer> nodeIds) {
-            return voters
+            return running
                 .values()
                 .stream()
-                .filter(node -> nodeIds.contains(node.id()))
+                .filter(node -> nodeIds.contains(node.nodeId))
                 .map(Cluster::nodeAddress)
                 .collect(Collectors.toSet());
         }
 
+        Optional<VoterSet> initialVoterSetFromIds() {
+            return Optional.of(VoterSet.fromMap(initialVoters
+                .values()
+                .stream()
+                .map(node -> new VoterSet.VoterNode(
+                    ReplicaKey.of(node.id(), 
persistentStates.get(node.id()).nodeDirectoryId),
+                    endpointsFromId(node.id(), 
MockNetworkChannel.LISTENER_NAME),
+                    new SupportedVersionRange((short) 1)
+                )).collect(Collectors.toMap(voterNode -> 
voterNode.voterKey().id(), Function.identity()))));
+        }
+
         Set<Integer> nodeIds() {
-            return nodes.keySet();
+            return persistentStates.keySet();
         }
 
-        int majoritySize() {
-            return voters.size() / 2 + 1;
+        int majoritySize(int numberOfVoters) {
+            return numberOfVoters / 2 + 1;
         }

Review Comment:
   This doesn't use any object fields. This can be a static method. Might as 
well move this to `RaftTestUtils`.



##########
raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java:
##########
@@ -1326,42 +1745,47 @@ private void assertCommittedData(RaftNode node) {
                 return;
             }
 
-            AtomicLong startOffset = new AtomicLong(0);
-            log.earliestSnapshotId().ifPresent(snapshotId -> {
-                assertTrue(snapshotId.offset() <= highWatermark.getAsLong());
-                startOffset.set(snapshotId.offset());
+            AtomicLong startOffset = new AtomicLong(log.startOffset());
 
-                try (SnapshotReader<Integer> snapshot = 
RecordsSnapshotReader.of(
+            // We do not perform this check with KIP-853 enabled
+            // because we write the bootstrap snapshot to all initial voters
+            if (!cluster.withKip853) {

Review Comment:
   This is not because of KIP-853. This is because the bootstrap checkpoint 
(`offset == 0`) doesn't contain any data batches. It only contains control 
batches.



##########
raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java:
##########
@@ -1127,14 +1477,83 @@ private MajorityReachedHighWatermark(Cluster cluster) {
 
         @Override
         public void verify() {
-            cluster.leaderHighWatermark().ifPresent(highWatermark -> {
-                long numReachedHighWatermark = 
cluster.nodes.entrySet().stream()
-                    .filter(entry -> 
cluster.voters.containsKey(entry.getKey()))
-                    .filter(entry -> entry.getValue().log.endOffset().offset() 
>= highWatermark)
-                    .count();
-                assertTrue(
-                    numReachedHighWatermark >= cluster.majoritySize(),
-                    "Insufficient nodes have reached current high watermark");
+            if (cluster.withKip853) {
+                /*
+                * For clusters running in KIP-853 mode, we check that a 
majority of at least one of the following
+                * voter sets has reached the high watermark:
+                * 1. the leader's voter set at the HWM (i.e. the last 
committed voter set)
+                * 2. the leader's lastVoterSet() (which may or may not be 
committed)
+                * Note that 1 and 2 can be the same set, but when they are 
not, lastVoterSet() is uncommitted,
+                * which follows from the AtMostOneUncommittedVoterSet 
invariant.
+                *
+                * A more elaborate check is necessary for this invariant 
because this method can get called after the
+                * leader has updated its lastVoterSet() with a new uncommitted 
voter set, but before the leader has
+                * updated its high watermark using the new voter set. In this 
case, we need to check that the majority
+                * of the last committed voter set has reached the current high 
watermark.
+                * */
+                cluster.leaderWithMaxEpoch().ifPresent(leaderNode -> {
+                    leaderNode.client.highWatermark().ifPresent(highWatermark 
-> {
+                        VoterSet voterSet = 
leaderNode.client.partitionState().lastVoterSet();
+                        long numReachedHighWatermark = 
numReachedHighWatermark(highWatermark, voterSet.voterIds());
+                        if (numReachedHighWatermark < 
cluster.majoritySize(voterSet.size())) {
+                            
leaderNode.client.partitionState().voterSetAtOffset(highWatermark - 
1).ifPresent(otherVoterSet -> {
+                                long nodesReachedHighWatermark = 
numReachedHighWatermark(highWatermark, otherVoterSet.voterIds());
+                                assertTrue(
+                                    nodesReachedHighWatermark >= 
cluster.majoritySize(otherVoterSet.size()),
+                                    "Insufficient nodes have reached current 
high watermark. Expected at least " +
+                                        
cluster.majoritySize(otherVoterSet.size()) + " but got " + 
nodesReachedHighWatermark);
+                            });
+                            return;
+                        }
+                        assertTrue(
+                            numReachedHighWatermark >= 
cluster.majoritySize(voterSet.size()),
+                            "Insufficient nodes have reached current high 
watermark. Expected at least " +
+                                cluster.majoritySize(voterSet.size()) + " but 
got " + numReachedHighWatermark);
+                    });
+                });
+            } else {
+                cluster.leaderHighWatermark().ifPresent(highWatermark -> {
+                    long numReachedHighWatermark = 
numReachedHighWatermark(highWatermark, cluster.initialVoters.keySet());
+                    assertTrue(
+                        numReachedHighWatermark >= 
cluster.majoritySize(cluster.initialVoters.size()),
+                        "Insufficient nodes have reached current high 
watermark");
+                });
+            }
+        }
+
+        private long numReachedHighWatermark(long highWatermark, Set<Integer> 
voterIds) {
+            return cluster.persistentStates.entrySet().stream()
+                .filter(entry -> voterIds.contains(entry.getKey()))
+                .filter(entry -> entry.getValue().log.endOffset().offset() >= 
highWatermark)
+                .count();
+        }
+    }
+
+    private static class AtMostOneUncommittedVotersRecord implements Invariant 
{
+        final Cluster cluster;
+
+        private AtMostOneUncommittedVotersRecord(Cluster cluster) {
+            this.cluster = cluster;
+        }
+
+        @Override
+        public void verify() {
+            cluster.leaderWithMaxEpoch().ifPresent(leaderNode -> {
+                leaderNode.log.readBatches(leaderNode.highWatermark(), 
OptionalLong.of(leaderNode.logEndOffset())).forEach(batch -> {
+                    boolean seenUncommittedVotersRecord = false;
+                    if (batch.isControlBatch) {
+                        for (LogEntry entry : batch.entries) {
+                            short typeId = 
ControlRecordType.parseTypeId(entry.record.key());
+                            ControlRecordType type = 
ControlRecordType.fromTypeId(typeId);
+                            if (type == ControlRecordType.KRAFT_VOTERS) {
+                                if (seenUncommittedVotersRecord) {
+                                    fail("More than one uncommitted voters 
record found in the log");
+                                }
+                                seenUncommittedVotersRecord = true;
+                            }
+                        }
+                    }
+                });

Review Comment:
   This feels very implementation specific. The ideal invariant is that logs 
stay consistent irrespective of add/remove operations, leader election and 
network failures.



##########
raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java:
##########
@@ -645,39 +837,56 @@ private static class Cluster {
         final AtomicInteger correlationIdCounter = new AtomicInteger();
         final MockTime time = new MockTime();
         final String clusterId = Uuid.randomUuid().toString();
-        final Map<Integer, Node> voters = new HashMap<>();
-        final Map<Integer, PersistentState> nodes = new HashMap<>();
+        final Map<Integer, Node> initialVoters = new HashMap<>();
+        final Map<Integer, PersistentState> persistentStates = new HashMap<>();
         final Map<Integer, RaftNode> running = new HashMap<>();
+        final boolean withKip853;
 
-        private Cluster(int numVoters, int numObservers, Random random) {
+        private Cluster(int numVoters, int numObservers, Random random, 
boolean withKip853) {
             this.random = random;
+            this.withKip853 = withKip853;
 
             for (int nodeId = 0; nodeId < numVoters; nodeId++) {
-                voters.put(nodeId, nodeFromId(nodeId));
-                nodes.put(nodeId, new PersistentState(nodeId));
+                initialVoters.put(nodeId, nodeFromId(nodeId));
+                persistentStates.put(nodeId, new PersistentState(nodeId));
             }
 
             for (int nodeIdDelta = 0; nodeIdDelta < numObservers; 
nodeIdDelta++) {
                 int nodeId = numVoters + nodeIdDelta;
-                nodes.put(nodeId, new PersistentState(nodeId));
+                persistentStates.put(nodeId, new PersistentState(nodeId));
             }
         }
 
         Set<InetSocketAddress> endpointsFromIds(Set<Integer> nodeIds) {
-            return voters
+            return running
                 .values()
                 .stream()
-                .filter(node -> nodeIds.contains(node.id()))
+                .filter(node -> nodeIds.contains(node.nodeId))
                 .map(Cluster::nodeAddress)
                 .collect(Collectors.toSet());
         }
 
+        Optional<VoterSet> initialVoterSetFromIds() {

Review Comment:
   Did you address this comment by Alyssa?
   
   The name doesn't seem to match the implementation. "From what ids?" Did you 
mean `initialVoterSet`? 



##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -3637,6 +3637,11 @@ QuorumState quorum() {
         return quorum;
     }
 
+    // Visible only for test
+    KRaftControlRecordStateMachine partitionState() {

Review Comment:
   We should not expose this. This is very implementation specific. Did you 
consider adding the functionality you need to `MockLog` which is the replicated 
log implementation used by the simulation tests?



##########
raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java:
##########
@@ -117,40 +126,83 @@ public class RaftEventSimulationTest {
     void canElectInitialLeader(
         @ForAll int seed,
         @ForAll @IntRange(min = 1, max = 5) int numVoters,
-        @ForAll @IntRange(min = 0, max = 5) int numObservers
+        @ForAll @IntRange(min = 0, max = 5) int numObservers,
+        @ForAll boolean withKip853

Review Comment:
   Do you want to use `KRaftVersion` instead of a boolean?
   
   This comment applies to a few of the tests.



##########
raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java:
##########
@@ -546,6 +728,36 @@ void execute(EventScheduler scheduler) {
         }
     }
 
+    private static class ScheduledUntilConditionMetEvent extends Event {
+        final Random random;
+        final int periodMs;
+        final int jitterMs;
+        final Supplier<Boolean> exitCondition;
+
+        protected ScheduledUntilConditionMetEvent(Runnable action,
+                                int eventId,
+                                Random random,
+                                long deadlineMs,
+                                int periodMs,
+                                int jitterMs,
+                                Supplier<Boolean> exitCondition) {

Review Comment:
   Please use this formatting:
   ```java
           protected ScheduledUntilConditionMetEvent(
               Runnable action,
               int eventId,
               Random random,
               long deadlineMs,
               int periodMs,
               int jitterMs,
               Supplier<Boolean> exitCondition
           ) {
   ```



##########
raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java:
##########
@@ -975,7 +1297,35 @@ void poll() {
                     client.poll();
                 } while (client.isRunning() && !messageQueue.isEmpty());
             } catch (Exception e) {
-                throw new RuntimeException("Uncaught exception during poll of 
node " + nodeId, e);
+                throw new RuntimeException("Uncaught exception during poll of 
node " + nodeId + ": " + e, e);
+            }
+        }
+
+        /**
+         * Use this method to handle RPCs that KafkaRaftClient does not 
support but are necessary for testing,
+         * like API_VERSIONS when adding a voter
+         * @param request - the inbound request the RaftNode is handling
+         */
+        void handle(RaftRequest.Inbound request) {
+            ApiKeys apiKey = ApiKeys.forId(request.data().apiKey());
+            switch (apiKey) {
+                case API_VERSIONS:
+                    ApiMessage apiVersionsResponse = new 
ApiVersionsResponse.Builder().
+                        setSupportedFeatures(
+                            Features.supportedFeatures(
+                                Collections.singletonMap(
+                                    KRaftVersion.FEATURE_NAME,
+                                    new 
SupportedVersionRange(KRaftVersion.KRAFT_VERSION_1.featureLevel(), 
KRaftVersion.LATEST_PRODUCTION.featureLevel())

Review Comment:
   ```java
                                       new SupportedVersionRange(
                                           
KRaftVersion.KRAFT_VERSION_0.featureLevel(),
                                           
KRaftVersion.LATEST_PRODUCTION.featureLevel()
                                       )
   ```
   
   Notice that the formatting change and that the min kraft version is 0 and 
not 1.



##########
raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java:
##########
@@ -1127,14 +1331,75 @@ private MajorityReachedHighWatermark(Cluster cluster) {
 
         @Override
         public void verify() {
-            cluster.leaderHighWatermark().ifPresent(highWatermark -> {
-                long numReachedHighWatermark = 
cluster.nodes.entrySet().stream()
-                    .filter(entry -> 
cluster.voters.containsKey(entry.getKey()))
-                    .filter(entry -> entry.getValue().log.endOffset().offset() 
>= highWatermark)
-                    .count();
-                assertTrue(
-                    numReachedHighWatermark >= cluster.majoritySize(),
-                    "Insufficient nodes have reached current high watermark");
+            if (cluster.withKip853) {
+                /*
+                * For clusters running in KIP-853 mode, we check that a 
majority of at least one of:
+                * 1. the leader's voter set at the HWM
+                * 2. the leader's lastVoterSet()
+                * has reached the HWM. We need to perform a more elaborate 
check here because in clusters where
+                * an Add/RemoveVoter request increases/decreases the majority 
of voters value by 1, the leader
+                * could have used either majority value to update its HWM 
value. This is because depending on
+                * whether the leader read the most recent VotersRecord prior 
to updating its HWM value, the number
+                * of nodes (the majority) used to calculate that HWM value is 
different. This matters for invariant
+                * checking because we perform this verification on every 
message delivery.
+                * */
+                cluster.leaderWithMaxEpoch().ifPresent(leaderNode -> {
+                    leaderNode.client.highWatermark().ifPresent(highWatermark 
-> {
+                        VoterSet voterSet = 
leaderNode.client.partitionState().lastVoterSet();
+                        long numReachedHighWatermark = 
numReachedHighWatermark(highWatermark, voterSet.voterIds());
+                        if (numReachedHighWatermark < 
cluster.majoritySize(voterSet.size())) {
+                            
leaderNode.client.partitionState().voterSetAtOffset(highWatermark - 
1).ifPresent(otherVoterSet -> {

Review Comment:
   The scenario that you are trying to handle is for the following log:
   ```
   ... | voterSet1 | ..... | high-watermark | ... | voterSet2 | ...
   ```
   voterSet1 is the latest voter set before the high-watermark. This may be the 
static voter set.
   voterSet2 is the oldest voter set after the high-watermark. It may or may 
not exist. It doesn't exist if dynamic voters is not supported or if the voter 
set has not been reconfigured.
   
   KRaft is allowed to commit up to the HWM between voterSet1 or voterSet2, 
with either voter set.
   
   Since voterSet1 and voterSet2 can only differ by at most one voter, we can 
define the minimal voter set as the smaller of the two voter sets. If voterSet2 
doesn't exist, the minimal voter set is voterSet1.
   
   With this definition, the high-watermark must have been replicated to the 
majority of the minimal voter set. What do think of this definition? This 
definition work with both static voters and dynamic voters. With static voters, 
voterSet2 never exists.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to