showuon commented on code in PR #20859:
URL: https://github.com/apache/kafka/pull/20859#discussion_r2544517164


##########
server/src/test/java/org/apache/kafka/server/ReconfigurableQuorumIntegrationTest.java:
##########
@@ -218,6 +218,82 @@ public void testControllersAutoJoinStandaloneVoter() 
throws Exception {
         }
     }
 
+    @Test
+    public void testRemovedControllerWontJoinAgain() throws Exception {
+        final var nodes = new TestKitNodes.Builder().
+                setNumBrokerNodes(1).
+                setNumControllerNodes(3).
+                build();
+
+        // Configure the initial voters with one voter having a different 
directory ID.
+        // This simulates the case where the controller failed and is brought 
back up with a different directory ID.
+        final Map<Integer, Uuid> initialVoters = new HashMap<>();
+        final var oldDirectoryId = Uuid.randomUuid();
+        for (final var controllerNode : nodes.controllerNodes().values()) {
+            initialVoters.put(
+                controllerNode.id(),
+                controllerNode.id() == TestKitDefaults.CONTROLLER_ID_OFFSET ?
+                        oldDirectoryId : controllerNode.metadataDirectoryId()
+            );
+        }
+
+        try (KafkaClusterTestKit cluster = new 
KafkaClusterTestKit.Builder(nodes).
+                setConfigProp(QuorumConfig.QUORUM_AUTO_JOIN_ENABLE_CONFIG, 
true).
+                setInitialVoterSet(initialVoters).
+                build()
+        ) {
+            cluster.format();
+            cluster.startup();
+            try (var admin = Admin.create(cluster.clientProperties())) {
+                // Static voter set is initialized
+                TestUtils.retryOnExceptionWithTimeout(30_000, 100, () -> {
+                    Map<Integer, Uuid> voters = findVoterDirs(admin);
+                    assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
+                    for (int replicaId : new int[] {3000, 3001, 3002}) {
+                        
assertEquals(nodes.controllerNodes().get(replicaId).metadataDirectoryId(), 
voters.get(replicaId));
+                    }
+                });
+
+                // Remove 3002 from voter set
+                TestUtils.retryOnExceptionWithTimeout(30_000, 100, () -> {
+                    Map<Integer, Uuid> voters = findVoterDirs(admin);
+                    if (!voters.containsKey(3002)) {
+                        // if there are no node 3002, it should be removed
+                        return;
+                    }

Review Comment:
   We should fail the test in this unexpected case, right?



##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -501,6 +503,11 @@ public void initialize(
         logger.info("Reading KRaft snapshot and log as part of the 
initialization");
         partitionState.updateState();
         logger.info("Starting voters are {}", partitionState.lastVoterSet());
+        if (nodeId.isPresent()) {
+            // if starting voters contain node id, it can't join to the cluster
+            // because it is already in.

Review Comment:
   nit: if [the] starting voters contain [the node id of this node], it can't 
[auto] join to the cluster ...



##########
server/src/test/java/org/apache/kafka/server/ReconfigurableQuorumIntegrationTest.java:
##########
@@ -218,6 +218,82 @@ public void testControllersAutoJoinStandaloneVoter() 
throws Exception {
         }
     }
 
+    @Test
+    public void testRemovedControllerWontJoinAgain() throws Exception {
+        final var nodes = new TestKitNodes.Builder().
+                setNumBrokerNodes(1).
+                setNumControllerNodes(3).
+                build();
+
+        // Configure the initial voters with one voter having a different 
directory ID.
+        // This simulates the case where the controller failed and is brought 
back up with a different directory ID.
+        final Map<Integer, Uuid> initialVoters = new HashMap<>();
+        final var oldDirectoryId = Uuid.randomUuid();
+        for (final var controllerNode : nodes.controllerNodes().values()) {
+            initialVoters.put(
+                controllerNode.id(),
+                controllerNode.id() == TestKitDefaults.CONTROLLER_ID_OFFSET ?
+                        oldDirectoryId : controllerNode.metadataDirectoryId()
+            );
+        }
+
+        try (KafkaClusterTestKit cluster = new 
KafkaClusterTestKit.Builder(nodes).
+                setConfigProp(QuorumConfig.QUORUM_AUTO_JOIN_ENABLE_CONFIG, 
true).
+                setInitialVoterSet(initialVoters).
+                build()
+        ) {
+            cluster.format();
+            cluster.startup();
+            try (var admin = Admin.create(cluster.clientProperties())) {
+                // Static voter set is initialized
+                TestUtils.retryOnExceptionWithTimeout(30_000, 100, () -> {
+                    Map<Integer, Uuid> voters = findVoterDirs(admin);
+                    assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
+                    for (int replicaId : new int[] {3000, 3001, 3002}) {
+                        
assertEquals(nodes.controllerNodes().get(replicaId).metadataDirectoryId(), 
voters.get(replicaId));
+                    }
+                });
+
+                // Remove 3002 from voter set
+                TestUtils.retryOnExceptionWithTimeout(30_000, 100, () -> {
+                    Map<Integer, Uuid> voters = findVoterDirs(admin);
+                    if (!voters.containsKey(3002)) {
+                        // if there are no node 3002, it should be removed
+                        return;
+                    }
+
+                    admin.removeRaftVoter(3002, voters.get(3002)).all().get();
+                    assertEquals(Set.of(3000, 3001), voters.keySet());
+                    for (int replicaId : new int[] {3000, 3001}) {
+                        
assertEquals(nodes.controllerNodes().get(replicaId).metadataDirectoryId(), 
voters.get(replicaId));
+                    }
+                });
+
+
+                // do not join the voter set in the next twenty seconds
+                for (int i = 0; i < 20; ++i) {
+                    TestUtils.retryOnExceptionWithTimeout(30_000, 100, () -> {
+                        TestUtils.retryOnExceptionWithTimeout(30_000, 100, () 
-> {
+                            Map<Integer, Uuid> voters = findVoterDirs(admin);
+                            if (!voters.containsKey(3002)) {
+                                // if there are no node 3002, it should be 
removed
+                                return;
+                            }
+
+                            admin.removeRaftVoter(3002, 
voters.get(3002)).all().get();
+                            assertEquals(Set.of(3000, 3001), voters.keySet());
+                            for (int replicaId : new int[] {3000, 3001}) {
+                                
assertEquals(nodes.controllerNodes().get(replicaId).metadataDirectoryId(), 
voters.get(replicaId));
+                            }
+                        });
+                    });
+                    Thread.sleep(1000);
+                }

Review Comment:
   This test doesn't make sense to me.
   1. `do not join the voter set in the next twenty seconds`: The 20 seconds is 
not true because the `retryOnExceptionWithTimeout` could take up to 30 seconds 
one time.
   2. Why should we use 2 `retryOnExceptionWithTimeout` here? 
   3. The test is not testing what we expected. 
     3.1. What are you asserting here? I can only see you assert the node 3000, 
3001 are in the voters, but we want to test 3002, right? If 3002 is in the 
voters, the test can still be passed!?
     3.2. If you want to test remove voters 20 times, then why do we need 
L257~L269 above? It's only doing the 21th time.
     3.3. Even if the above comments are all addressed, the test is still 
unreliable. The test can still be passed as long as the auto-join is done after 
the assertion.



##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -3347,7 +3357,7 @@ private long pollFollowerAsVoter(FollowerState state, 
long currentTimeMs) {
         );
     }
 
-    private boolean shouldSendAddOrRemoveVoterRequest(FollowerState state, 
long currentTimeMs) {
+    private boolean maybeAutoJoin(FollowerState state, long currentTimeMs) {

Review Comment:
   The method is returning boolean, so maybe rename to `shouldAutoJoin`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to