ahuang98 commented on code in PR #18987:
URL: https://github.com/apache/kafka/pull/18987#discussion_r1968254346


##########
raft/src/main/java/org/apache/kafka/raft/VoterSet.java:
##########
@@ -343,6 +343,10 @@ public ReplicaKey voterKey() {
             return voterKey;
         }
 
+        public int voterId() {

Review Comment:
   can this be derived from voterKey().id() instead? 



##########
raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java:
##########
@@ -487,6 +555,12 @@ private EventScheduler 
schedulerWithDefaultInvariants(Cluster cluster) {
         return scheduler;
     }
 
+    private EventScheduler schedulerWithKip853Invariants(Cluster cluster) {
+        EventScheduler scheduler = schedulerWithDefaultInvariants(cluster);
+        scheduler.addInvariant(new AtMostOneUncommittedVoterSet(cluster));

Review Comment:
   this invariant doesn't apply to all the tests? (if there is a reason this 
invariant can't be part of the default invariant set let's add a comment to 
explain why)



##########
raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java:
##########
@@ -1127,14 +1331,75 @@ private MajorityReachedHighWatermark(Cluster cluster) {
 
         @Override
         public void verify() {
-            cluster.leaderHighWatermark().ifPresent(highWatermark -> {
-                long numReachedHighWatermark = 
cluster.nodes.entrySet().stream()
-                    .filter(entry -> 
cluster.voters.containsKey(entry.getKey()))
-                    .filter(entry -> entry.getValue().log.endOffset().offset() 
>= highWatermark)
-                    .count();
-                assertTrue(
-                    numReachedHighWatermark >= cluster.majoritySize(),
-                    "Insufficient nodes have reached current high watermark");
+            if (cluster.withKip853) {
+                /*
+                * For clusters running in KIP-853 mode, we check that a 
majority of at least one of:
+                * 1. the leader's voter set at the HWM
+                * 2. the leader's lastVoterSet()
+                * has reached the HWM. We need to perform a more elaborate 
check here because in clusters where
+                * an Add/RemoveVoter request increases/decreases the majority 
of voters value by 1, the leader
+                * could have used either majority value to update its HWM 
value. This is because depending on
+                * whether the leader read the most recent VotersRecord prior 
to updating its HWM value, the number
+                * of nodes (the majority) used to calculate that HWM value is 
different. This matters for invariant
+                * checking because we perform this verification on every 
message delivery.
+                * */
+                cluster.leaderWithMaxEpoch().ifPresent(leaderNode -> {
+                    leaderNode.client.highWatermark().ifPresent(highWatermark 
-> {
+                        VoterSet voterSet = 
leaderNode.client.partitionState().lastVoterSet();
+                        long numReachedHighWatermark = 
numReachedHighWatermark(highWatermark, voterSet.voterIds());
+                        if (numReachedHighWatermark < 
cluster.majoritySize(voterSet.size())) {
+                            
leaderNode.client.partitionState().voterSetAtOffset(highWatermark - 
1).ifPresent(otherVoterSet -> {

Review Comment:
   I'm having a hard time understanding when this would be necessary - we check 
verify on every "event" (e.g. new record written to leader? what else 
qualifies) - and we're concerned that we might be checking majority has reached 
HWM before the HWM reflects the current voter set? 
   



##########
raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java:
##########
@@ -1127,14 +1331,75 @@ private MajorityReachedHighWatermark(Cluster cluster) {
 
         @Override
         public void verify() {
-            cluster.leaderHighWatermark().ifPresent(highWatermark -> {
-                long numReachedHighWatermark = 
cluster.nodes.entrySet().stream()
-                    .filter(entry -> 
cluster.voters.containsKey(entry.getKey()))
-                    .filter(entry -> entry.getValue().log.endOffset().offset() 
>= highWatermark)
-                    .count();
-                assertTrue(
-                    numReachedHighWatermark >= cluster.majoritySize(),
-                    "Insufficient nodes have reached current high watermark");
+            if (cluster.withKip853) {
+                /*
+                * For clusters running in KIP-853 mode, we check that a 
majority of at least one of:
+                * 1. the leader's voter set at the HWM
+                * 2. the leader's lastVoterSet()
+                * has reached the HWM. We need to perform a more elaborate 
check here because in clusters where
+                * an Add/RemoveVoter request increases/decreases the majority 
of voters value by 1, the leader
+                * could have used either majority value to update its HWM 
value. This is because depending on
+                * whether the leader read the most recent VotersRecord prior 
to updating its HWM value, the number

Review Comment:
   correct me if I'm wrong - wouldn't this always be the case? that the leader 
read its most recent votersRecord prior to updating HWM? 
   
   `appendAsLeader -> updateState -> maybeLoadLog (will catch the latest 
votersRecord)`
   is called before
   `flushLeaderLog -> maybeUpdateHighWatermark`



##########
raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java:
##########
@@ -1127,14 +1331,75 @@ private MajorityReachedHighWatermark(Cluster cluster) {
 
         @Override
         public void verify() {
-            cluster.leaderHighWatermark().ifPresent(highWatermark -> {
-                long numReachedHighWatermark = 
cluster.nodes.entrySet().stream()
-                    .filter(entry -> 
cluster.voters.containsKey(entry.getKey()))
-                    .filter(entry -> entry.getValue().log.endOffset().offset() 
>= highWatermark)
-                    .count();
-                assertTrue(
-                    numReachedHighWatermark >= cluster.majoritySize(),
-                    "Insufficient nodes have reached current high watermark");
+            if (cluster.withKip853) {
+                /*
+                * For clusters running in KIP-853 mode, we check that a 
majority of at least one of:
+                * 1. the leader's voter set at the HWM

Review Comment:
   nit: wording is a bit unclear (e.g. reading "check that a majority of at 
least one of the leader's voter set at the HWM" doesn't make sense to me)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to