showuon commented on code in PR #20859:
URL: https://github.com/apache/kafka/pull/20859#discussion_r2548669325


##########
server/src/test/java/org/apache/kafka/server/ReconfigurableQuorumIntegrationTest.java:
##########
@@ -218,6 +218,75 @@ public void testControllersAutoJoinStandaloneVoter() 
throws Exception {
         }
     }
 
+    @Test
+    public void testRemovedControllerWontJoinAgain() throws Exception {
+        final var nodes = new TestKitNodes.Builder().
+                setNumBrokerNodes(1).
+                setNumControllerNodes(3).
+                build();
+
+        final Map<Integer, Uuid> initialVoters = new HashMap<>();
+        for (final var controllerNode : nodes.controllerNodes().values()) {
+            initialVoters.put(
+                controllerNode.id(), controllerNode.metadataDirectoryId()
+            );
+        }
+
+        try (KafkaClusterTestKit cluster = new 
KafkaClusterTestKit.Builder(nodes).
+                setConfigProp(QuorumConfig.QUORUM_AUTO_JOIN_ENABLE_CONFIG, 
true).
+                setInitialVoterSet(initialVoters).
+                build()
+        ) {
+            cluster.format();
+            cluster.startup();
+            try (var admin = Admin.create(cluster.clientProperties())) {
+                // Static voter set is initialized
+                TestUtils.retryOnExceptionWithTimeout(30_000, 100, () -> {
+                    Map<Integer, Uuid> voters = findVoterDirs(admin);
+                    assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
+                    for (int replicaId : new int[] {3000, 3001, 3002}) {
+                        
assertEquals(nodes.controllerNodes().get(replicaId).metadataDirectoryId(), 
voters.get(replicaId));
+                    }
+                });
+
+
+                // Remove 3002 from the voter set
+                Uuid dirId = 
cluster.nodes().controllerNodes().get(3002).metadataDirectoryId();
+                admin.removeRaftVoter(3002, dirId).all().get();
+                TestUtils.retryOnExceptionWithTimeout(30_000, 100, () -> {
+                    Map<Integer, Uuid> voters = findVoterDirs(admin);
+                    if (!voters.containsKey(3002)) {
+                        // if there are no node 3002, it should be return
+                        return;
+                    }

Review Comment:
   This can be removed now because we already do the assertion below.



##########
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientAutoJoinTest.java:
##########
@@ -317,6 +321,131 @@ private void pollAndDeliverFetchToUpdateVoterSet(
         context.client.poll();
     }
 
+    @Test
+    public void testBootstrapVoterSetDoesNotSendAddVoterAfterRemove() throws 
Exception {

Review Comment:
   Nice tests added in KafkaRaftClientAutoJoinTest. Thanks.



##########
server/src/test/java/org/apache/kafka/server/ReconfigurableQuorumIntegrationTest.java:
##########
@@ -218,6 +218,75 @@ public void testControllersAutoJoinStandaloneVoter() 
throws Exception {
         }
     }
 
+    @Test
+    public void testRemovedControllerWontJoinAgain() throws Exception {
+        final var nodes = new TestKitNodes.Builder().
+                setNumBrokerNodes(1).
+                setNumControllerNodes(3).
+                build();
+
+        final Map<Integer, Uuid> initialVoters = new HashMap<>();
+        for (final var controllerNode : nodes.controllerNodes().values()) {
+            initialVoters.put(
+                controllerNode.id(), controllerNode.metadataDirectoryId()
+            );
+        }
+
+        try (KafkaClusterTestKit cluster = new 
KafkaClusterTestKit.Builder(nodes).
+                setConfigProp(QuorumConfig.QUORUM_AUTO_JOIN_ENABLE_CONFIG, 
true).
+                setInitialVoterSet(initialVoters).
+                build()
+        ) {
+            cluster.format();
+            cluster.startup();
+            try (var admin = Admin.create(cluster.clientProperties())) {
+                // Static voter set is initialized
+                TestUtils.retryOnExceptionWithTimeout(30_000, 100, () -> {
+                    Map<Integer, Uuid> voters = findVoterDirs(admin);
+                    assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
+                    for (int replicaId : new int[] {3000, 3001, 3002}) {
+                        
assertEquals(nodes.controllerNodes().get(replicaId).metadataDirectoryId(), 
voters.get(replicaId));
+                    }
+                });
+
+
+                // Remove 3002 from the voter set
+                Uuid dirId = 
cluster.nodes().controllerNodes().get(3002).metadataDirectoryId();
+                admin.removeRaftVoter(3002, dirId).all().get();
+                TestUtils.retryOnExceptionWithTimeout(30_000, 100, () -> {
+                    Map<Integer, Uuid> voters = findVoterDirs(admin);
+                    if (!voters.containsKey(3002)) {
+                        // if there are no node 3002, it should be return
+                        return;
+                    }
+
+                    assertEquals(Set.of(3000, 3001), voters.keySet());
+                    for (int replicaId : new int[] {3000, 3001}) {
+                        
assertEquals(nodes.controllerNodes().get(replicaId).metadataDirectoryId(), 
voters.get(replicaId));
+                    }
+                });
+
+                // We need to wait update timer to expire and then send 
AddVoter request automatically.
+                Thread.sleep(2000);
+
+                // Verify 3002 is already fetch and does not send add voter 
request

Review Comment:
   We'd better to make the comment clear here, otherwise, no one will know what 
exactly what we're doing here.
   `Because the auto join will happen before sending fetch request to the 
leader, here we verify the node id 3002 is already fetched from the active 
controller by checking the high watermark. Then verifying the node 3002 does 
not exist in the voter set.`



##########
server/src/test/java/org/apache/kafka/server/ReconfigurableQuorumIntegrationTest.java:
##########
@@ -218,6 +218,75 @@ public void testControllersAutoJoinStandaloneVoter() 
throws Exception {
         }
     }
 
+    @Test
+    public void testRemovedControllerWontJoinAgain() throws Exception {
+        final var nodes = new TestKitNodes.Builder().
+                setNumBrokerNodes(1).
+                setNumControllerNodes(3).
+                build();
+
+        final Map<Integer, Uuid> initialVoters = new HashMap<>();
+        for (final var controllerNode : nodes.controllerNodes().values()) {
+            initialVoters.put(
+                controllerNode.id(), controllerNode.metadataDirectoryId()
+            );
+        }
+
+        try (KafkaClusterTestKit cluster = new 
KafkaClusterTestKit.Builder(nodes).
+                setConfigProp(QuorumConfig.QUORUM_AUTO_JOIN_ENABLE_CONFIG, 
true).
+                setInitialVoterSet(initialVoters).
+                build()
+        ) {
+            cluster.format();
+            cluster.startup();
+            try (var admin = Admin.create(cluster.clientProperties())) {
+                // Static voter set is initialized
+                TestUtils.retryOnExceptionWithTimeout(30_000, 100, () -> {
+                    Map<Integer, Uuid> voters = findVoterDirs(admin);
+                    assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
+                    for (int replicaId : new int[] {3000, 3001, 3002}) {
+                        
assertEquals(nodes.controllerNodes().get(replicaId).metadataDirectoryId(), 
voters.get(replicaId));
+                    }
+                });
+
+
+                // Remove 3002 from the voter set
+                Uuid dirId = 
cluster.nodes().controllerNodes().get(3002).metadataDirectoryId();
+                admin.removeRaftVoter(3002, dirId).all().get();
+                TestUtils.retryOnExceptionWithTimeout(30_000, 100, () -> {
+                    Map<Integer, Uuid> voters = findVoterDirs(admin);
+                    if (!voters.containsKey(3002)) {
+                        // if there are no node 3002, it should be return
+                        return;
+                    }
+
+                    assertEquals(Set.of(3000, 3001), voters.keySet());
+                    for (int replicaId : new int[] {3000, 3001}) {
+                        
assertEquals(nodes.controllerNodes().get(replicaId).metadataDirectoryId(), 
voters.get(replicaId));
+                    }
+                });
+
+                // We need to wait update timer to expire and then send 
AddVoter request automatically.
+                Thread.sleep(2000);

Review Comment:
   1. We can add a variable for the 2 sec to explain it. Ex: 
`defaultUpdateVoterSetPeriodTimeout`. 
   2. It's "update voter set timer".
   3. `We need to wait for the update voter set timer expiring to allow the 
addVoter request to be sent.`



##########
server/src/test/java/org/apache/kafka/server/ReconfigurableQuorumIntegrationTest.java:
##########
@@ -218,6 +218,75 @@ public void testControllersAutoJoinStandaloneVoter() 
throws Exception {
         }
     }
 
+    @Test
+    public void testRemovedControllerWontJoinAgain() throws Exception {
+        final var nodes = new TestKitNodes.Builder().
+                setNumBrokerNodes(1).
+                setNumControllerNodes(3).
+                build();
+
+        final Map<Integer, Uuid> initialVoters = new HashMap<>();
+        for (final var controllerNode : nodes.controllerNodes().values()) {
+            initialVoters.put(
+                controllerNode.id(), controllerNode.metadataDirectoryId()
+            );
+        }
+
+        try (KafkaClusterTestKit cluster = new 
KafkaClusterTestKit.Builder(nodes).
+                setConfigProp(QuorumConfig.QUORUM_AUTO_JOIN_ENABLE_CONFIG, 
true).
+                setInitialVoterSet(initialVoters).
+                build()
+        ) {
+            cluster.format();
+            cluster.startup();
+            try (var admin = Admin.create(cluster.clientProperties())) {
+                // Static voter set is initialized
+                TestUtils.retryOnExceptionWithTimeout(30_000, 100, () -> {
+                    Map<Integer, Uuid> voters = findVoterDirs(admin);
+                    assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
+                    for (int replicaId : new int[] {3000, 3001, 3002}) {
+                        
assertEquals(nodes.controllerNodes().get(replicaId).metadataDirectoryId(), 
voters.get(replicaId));
+                    }
+                });
+
+
+                // Remove 3002 from the voter set
+                Uuid dirId = 
cluster.nodes().controllerNodes().get(3002).metadataDirectoryId();
+                admin.removeRaftVoter(3002, dirId).all().get();
+                TestUtils.retryOnExceptionWithTimeout(30_000, 100, () -> {
+                    Map<Integer, Uuid> voters = findVoterDirs(admin);
+                    if (!voters.containsKey(3002)) {
+                        // if there are no node 3002, it should be return
+                        return;
+                    }
+
+                    assertEquals(Set.of(3000, 3001), voters.keySet());
+                    for (int replicaId : new int[] {3000, 3001}) {
+                        
assertEquals(nodes.controllerNodes().get(replicaId).metadataDirectoryId(), 
voters.get(replicaId));
+                    }
+                });
+
+                // We need to wait update timer to expire and then send 
AddVoter request automatically.
+                Thread.sleep(2000);
+
+                // Verify 3002 is already fetch and does not send add voter 
request
+                long removedAtHighWatermark = 
cluster.controllers().get(3002).raftManager().client().highWatermark().getAsLong();
+                TestUtils.waitForCondition(() ->
+                        
cluster.controllers().get(3002).raftManager().client().highWatermark().getAsLong()
 > removedAtHighWatermark,
+                    30_000, 100, () -> "High watermark is not advanced in 
30000ms"
+                );
+
+                // 3002 does not join the voter set after high watermark 
advance
+                Map<Integer, Uuid> voters = findVoterDirs(admin);
+                assertEquals(Set.of(3000, 3001), voters.keySet());
+                for (int replicaId : new int[] {3000, 3001}) {
+                    
assertEquals(nodes.controllerNodes().get(replicaId).metadataDirectoryId(), 
voters.get(replicaId));
+                }
+            }

Review Comment:
   After the final verification, could we add one more test to verify that 
under this situation, the `manual add voter via admin client` can still work? 
something like:
   ```
   // manual add 3002 voter
   admin.addRaftVoter(3002, .....)
   // verify it's joined successfully
   TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
                     Map<Integer, Uuid> voters = findVoterDirs(admin);
                     assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
                 ...
                 });
   ```
   
   WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to