kevin-wu24 commented on code in PR #18987: URL: https://github.com/apache/kafka/pull/18987#discussion_r2006461608
########## raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java: ########## @@ -1127,14 +1331,75 @@ private MajorityReachedHighWatermark(Cluster cluster) { @Override public void verify() { - cluster.leaderHighWatermark().ifPresent(highWatermark -> { - long numReachedHighWatermark = cluster.nodes.entrySet().stream() - .filter(entry -> cluster.voters.containsKey(entry.getKey())) - .filter(entry -> entry.getValue().log.endOffset().offset() >= highWatermark) - .count(); - assertTrue( - numReachedHighWatermark >= cluster.majoritySize(), - "Insufficient nodes have reached current high watermark"); + if (cluster.withKip853) { + /* + * For clusters running in KIP-853 mode, we check that a majority of at least one of: + * 1. the leader's voter set at the HWM + * 2. the leader's lastVoterSet() + * has reached the HWM. We need to perform a more elaborate check here because in clusters where + * an Add/RemoveVoter request increases/decreases the majority of voters value by 1, the leader + * could have used either majority value to update its HWM value. This is because depending on + * whether the leader read the most recent VotersRecord prior to updating its HWM value, the number + * of nodes (the majority) used to calculate that HWM value is different. This matters for invariant + * checking because we perform this verification on every message delivery. + * */ + cluster.leaderWithMaxEpoch().ifPresent(leaderNode -> { + leaderNode.client.highWatermark().ifPresent(highWatermark -> { + VoterSet voterSet = leaderNode.client.partitionState().lastVoterSet(); + long numReachedHighWatermark = numReachedHighWatermark(highWatermark, voterSet.voterIds()); + if (numReachedHighWatermark < cluster.majoritySize(voterSet.size())) { + leaderNode.client.partitionState().voterSetAtOffset(highWatermark - 1).ifPresent(otherVoterSet -> { Review Comment: If the main concern is to avoid doing a separate check when the cluster has KIP-853 enabled, I think I can just remove the if statement, using `lastVoterSet()` with a static voter set always works (honestly not sure why this was even there). This way we are not relying on the implementation detail that each voter set can only differ by 1 voter. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org