kevin-wu24 commented on code in PR #18987:
URL: https://github.com/apache/kafka/pull/18987#discussion_r2006278691


##########
raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java:
##########
@@ -1127,14 +1331,75 @@ private MajorityReachedHighWatermark(Cluster cluster) {
 
         @Override
         public void verify() {
-            cluster.leaderHighWatermark().ifPresent(highWatermark -> {
-                long numReachedHighWatermark = 
cluster.nodes.entrySet().stream()
-                    .filter(entry -> 
cluster.voters.containsKey(entry.getKey()))
-                    .filter(entry -> entry.getValue().log.endOffset().offset() 
>= highWatermark)
-                    .count();
-                assertTrue(
-                    numReachedHighWatermark >= cluster.majoritySize(),
-                    "Insufficient nodes have reached current high watermark");
+            if (cluster.withKip853) {
+                /*
+                * For clusters running in KIP-853 mode, we check that a 
majority of at least one of:
+                * 1. the leader's voter set at the HWM
+                * 2. the leader's lastVoterSet()
+                * has reached the HWM. We need to perform a more elaborate 
check here because in clusters where
+                * an Add/RemoveVoter request increases/decreases the majority 
of voters value by 1, the leader
+                * could have used either majority value to update its HWM 
value. This is because depending on
+                * whether the leader read the most recent VotersRecord prior 
to updating its HWM value, the number
+                * of nodes (the majority) used to calculate that HWM value is 
different. This matters for invariant
+                * checking because we perform this verification on every 
message delivery.
+                * */
+                cluster.leaderWithMaxEpoch().ifPresent(leaderNode -> {
+                    leaderNode.client.highWatermark().ifPresent(highWatermark 
-> {
+                        VoterSet voterSet = 
leaderNode.client.partitionState().lastVoterSet();
+                        long numReachedHighWatermark = 
numReachedHighWatermark(highWatermark, voterSet.voterIds());
+                        if (numReachedHighWatermark < 
cluster.majoritySize(voterSet.size())) {
+                            
leaderNode.client.partitionState().voterSetAtOffset(highWatermark - 
1).ifPresent(otherVoterSet -> {

Review Comment:
   I agree that this minimal voter set check is valid, but I think it is too 
relaxed for a cluster with dynamic voters. Specifically, I have concerns about 
this statement:
   > KRaft is allowed to commit up to the HWM between voterSet1 or voterSet2, 
with either voter set.
   
   From my understanding of the code, once KRaft becomes aware of voterSet2, 
which happens at the following point in execution because 
`partitionState.lastVoterSet()` is set to voterSet2: `appendAsLeader -> 
partitionState.updateState()`, it should be using voterSet2 to calculate the 
HWM thereafter, which it does in the following code from 
`KafkaRaftClient#updateLeaderEndOffsetAndTimestamp`, which is called in 
`flushLeaderLog`: 
   ```
   if (state.updateLocalState(endOffsetMetadata, 
partitionState.lastVoterSet())) {
               onUpdateLeaderHighWatermark(state, currentTimeMs);
           }
   ```
   
   The only point where KafkaRaftClient state is the following:
   
   1. `partitionState.lastVoterSet() = votersSet2`
   2. HWM value was calculated using voterSet1
   
   is between `appendAsLeader -> partitionState.updateState()` and 
`flushLeaderLog` when voterSet2 is written to the log.
   
   When checking the invariant in `verify()`, the RaftNode either has an 
uncommitted voter set in its log or it doesn't. If it doesn't, using a majority 
`lastVoterSet()` to verify the HWM value is always correct. If it does, using a 
majority of `lastVoterSet()` to verify the HWM is correct except in the case I 
mentioned above.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to