showuon commented on code in PR #20859:
URL: https://github.com/apache/kafka/pull/20859#discussion_r2548669325
##########
server/src/test/java/org/apache/kafka/server/ReconfigurableQuorumIntegrationTest.java:
##########
@@ -218,6 +218,75 @@ public void testControllersAutoJoinStandaloneVoter()
throws Exception {
}
}
+ @Test
+ public void testRemovedControllerWontJoinAgain() throws Exception {
+ final var nodes = new TestKitNodes.Builder().
+ setNumBrokerNodes(1).
+ setNumControllerNodes(3).
+ build();
+
+ final Map<Integer, Uuid> initialVoters = new HashMap<>();
+ for (final var controllerNode : nodes.controllerNodes().values()) {
+ initialVoters.put(
+ controllerNode.id(), controllerNode.metadataDirectoryId()
+ );
+ }
+
+ try (KafkaClusterTestKit cluster = new
KafkaClusterTestKit.Builder(nodes).
+ setConfigProp(QuorumConfig.QUORUM_AUTO_JOIN_ENABLE_CONFIG,
true).
+ setInitialVoterSet(initialVoters).
+ build()
+ ) {
+ cluster.format();
+ cluster.startup();
+ try (var admin = Admin.create(cluster.clientProperties())) {
+ // Static voter set is initialized
+ TestUtils.retryOnExceptionWithTimeout(30_000, 100, () -> {
+ Map<Integer, Uuid> voters = findVoterDirs(admin);
+ assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
+ for (int replicaId : new int[] {3000, 3001, 3002}) {
+
assertEquals(nodes.controllerNodes().get(replicaId).metadataDirectoryId(),
voters.get(replicaId));
+ }
+ });
+
+
+ // Remove 3002 from the voter set
+ Uuid dirId =
cluster.nodes().controllerNodes().get(3002).metadataDirectoryId();
+ admin.removeRaftVoter(3002, dirId).all().get();
+ TestUtils.retryOnExceptionWithTimeout(30_000, 100, () -> {
+ Map<Integer, Uuid> voters = findVoterDirs(admin);
+ if (!voters.containsKey(3002)) {
+ // if there are no node 3002, it should be return
+ return;
+ }
Review Comment:
This can be removed now because we already do the assertion below.
##########
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientAutoJoinTest.java:
##########
@@ -317,6 +321,131 @@ private void pollAndDeliverFetchToUpdateVoterSet(
context.client.poll();
}
+ @Test
+ public void testBootstrapVoterSetDoesNotSendAddVoterAfterRemove() throws
Exception {
Review Comment:
Nice tests added in KafkaRaftClientAutoJoinTest. Thanks.
##########
server/src/test/java/org/apache/kafka/server/ReconfigurableQuorumIntegrationTest.java:
##########
@@ -218,6 +218,75 @@ public void testControllersAutoJoinStandaloneVoter()
throws Exception {
}
}
+ @Test
+ public void testRemovedControllerWontJoinAgain() throws Exception {
+ final var nodes = new TestKitNodes.Builder().
+ setNumBrokerNodes(1).
+ setNumControllerNodes(3).
+ build();
+
+ final Map<Integer, Uuid> initialVoters = new HashMap<>();
+ for (final var controllerNode : nodes.controllerNodes().values()) {
+ initialVoters.put(
+ controllerNode.id(), controllerNode.metadataDirectoryId()
+ );
+ }
+
+ try (KafkaClusterTestKit cluster = new
KafkaClusterTestKit.Builder(nodes).
+ setConfigProp(QuorumConfig.QUORUM_AUTO_JOIN_ENABLE_CONFIG,
true).
+ setInitialVoterSet(initialVoters).
+ build()
+ ) {
+ cluster.format();
+ cluster.startup();
+ try (var admin = Admin.create(cluster.clientProperties())) {
+ // Static voter set is initialized
+ TestUtils.retryOnExceptionWithTimeout(30_000, 100, () -> {
+ Map<Integer, Uuid> voters = findVoterDirs(admin);
+ assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
+ for (int replicaId : new int[] {3000, 3001, 3002}) {
+
assertEquals(nodes.controllerNodes().get(replicaId).metadataDirectoryId(),
voters.get(replicaId));
+ }
+ });
+
+
+ // Remove 3002 from the voter set
+ Uuid dirId =
cluster.nodes().controllerNodes().get(3002).metadataDirectoryId();
+ admin.removeRaftVoter(3002, dirId).all().get();
+ TestUtils.retryOnExceptionWithTimeout(30_000, 100, () -> {
+ Map<Integer, Uuid> voters = findVoterDirs(admin);
+ if (!voters.containsKey(3002)) {
+ // if there are no node 3002, it should be return
+ return;
+ }
+
+ assertEquals(Set.of(3000, 3001), voters.keySet());
+ for (int replicaId : new int[] {3000, 3001}) {
+
assertEquals(nodes.controllerNodes().get(replicaId).metadataDirectoryId(),
voters.get(replicaId));
+ }
+ });
+
+ // We need to wait update timer to expire and then send
AddVoter request automatically.
+ Thread.sleep(2000);
+
+ // Verify 3002 is already fetch and does not send add voter
request
Review Comment:
We'd better to make the comment clear here, otherwise, no one will know what
exactly what we're doing here.
`Because the auto join will happen before sending fetch request to the
leader, here we verify the node id 3002 is already fetched from the active
controller by checking the high watermark. Then verifying the node 3002 does
not exist in the voter set.`
##########
server/src/test/java/org/apache/kafka/server/ReconfigurableQuorumIntegrationTest.java:
##########
@@ -218,6 +218,75 @@ public void testControllersAutoJoinStandaloneVoter()
throws Exception {
}
}
+ @Test
+ public void testRemovedControllerWontJoinAgain() throws Exception {
+ final var nodes = new TestKitNodes.Builder().
+ setNumBrokerNodes(1).
+ setNumControllerNodes(3).
+ build();
+
+ final Map<Integer, Uuid> initialVoters = new HashMap<>();
+ for (final var controllerNode : nodes.controllerNodes().values()) {
+ initialVoters.put(
+ controllerNode.id(), controllerNode.metadataDirectoryId()
+ );
+ }
+
+ try (KafkaClusterTestKit cluster = new
KafkaClusterTestKit.Builder(nodes).
+ setConfigProp(QuorumConfig.QUORUM_AUTO_JOIN_ENABLE_CONFIG,
true).
+ setInitialVoterSet(initialVoters).
+ build()
+ ) {
+ cluster.format();
+ cluster.startup();
+ try (var admin = Admin.create(cluster.clientProperties())) {
+ // Static voter set is initialized
+ TestUtils.retryOnExceptionWithTimeout(30_000, 100, () -> {
+ Map<Integer, Uuid> voters = findVoterDirs(admin);
+ assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
+ for (int replicaId : new int[] {3000, 3001, 3002}) {
+
assertEquals(nodes.controllerNodes().get(replicaId).metadataDirectoryId(),
voters.get(replicaId));
+ }
+ });
+
+
+ // Remove 3002 from the voter set
+ Uuid dirId =
cluster.nodes().controllerNodes().get(3002).metadataDirectoryId();
+ admin.removeRaftVoter(3002, dirId).all().get();
+ TestUtils.retryOnExceptionWithTimeout(30_000, 100, () -> {
+ Map<Integer, Uuid> voters = findVoterDirs(admin);
+ if (!voters.containsKey(3002)) {
+ // if there are no node 3002, it should be return
+ return;
+ }
+
+ assertEquals(Set.of(3000, 3001), voters.keySet());
+ for (int replicaId : new int[] {3000, 3001}) {
+
assertEquals(nodes.controllerNodes().get(replicaId).metadataDirectoryId(),
voters.get(replicaId));
+ }
+ });
+
+ // We need to wait update timer to expire and then send
AddVoter request automatically.
+ Thread.sleep(2000);
Review Comment:
1. We can add a variable for the 2 sec to explain it. Ex:
`defaultUpdateVoterSetPeriodTimeout`.
2. It's "update voter set timer".
3. `We need to wait for the update voter set timer expiring to allow the
addVoter request to be sent.`
##########
server/src/test/java/org/apache/kafka/server/ReconfigurableQuorumIntegrationTest.java:
##########
@@ -218,6 +218,75 @@ public void testControllersAutoJoinStandaloneVoter()
throws Exception {
}
}
+ @Test
+ public void testRemovedControllerWontJoinAgain() throws Exception {
+ final var nodes = new TestKitNodes.Builder().
+ setNumBrokerNodes(1).
+ setNumControllerNodes(3).
+ build();
+
+ final Map<Integer, Uuid> initialVoters = new HashMap<>();
+ for (final var controllerNode : nodes.controllerNodes().values()) {
+ initialVoters.put(
+ controllerNode.id(), controllerNode.metadataDirectoryId()
+ );
+ }
+
+ try (KafkaClusterTestKit cluster = new
KafkaClusterTestKit.Builder(nodes).
+ setConfigProp(QuorumConfig.QUORUM_AUTO_JOIN_ENABLE_CONFIG,
true).
+ setInitialVoterSet(initialVoters).
+ build()
+ ) {
+ cluster.format();
+ cluster.startup();
+ try (var admin = Admin.create(cluster.clientProperties())) {
+ // Static voter set is initialized
+ TestUtils.retryOnExceptionWithTimeout(30_000, 100, () -> {
+ Map<Integer, Uuid> voters = findVoterDirs(admin);
+ assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
+ for (int replicaId : new int[] {3000, 3001, 3002}) {
+
assertEquals(nodes.controllerNodes().get(replicaId).metadataDirectoryId(),
voters.get(replicaId));
+ }
+ });
+
+
+ // Remove 3002 from the voter set
+ Uuid dirId =
cluster.nodes().controllerNodes().get(3002).metadataDirectoryId();
+ admin.removeRaftVoter(3002, dirId).all().get();
+ TestUtils.retryOnExceptionWithTimeout(30_000, 100, () -> {
+ Map<Integer, Uuid> voters = findVoterDirs(admin);
+ if (!voters.containsKey(3002)) {
+ // if there are no node 3002, it should be return
+ return;
+ }
+
+ assertEquals(Set.of(3000, 3001), voters.keySet());
+ for (int replicaId : new int[] {3000, 3001}) {
+
assertEquals(nodes.controllerNodes().get(replicaId).metadataDirectoryId(),
voters.get(replicaId));
+ }
+ });
+
+ // We need to wait update timer to expire and then send
AddVoter request automatically.
+ Thread.sleep(2000);
+
+ // Verify 3002 is already fetch and does not send add voter
request
+ long removedAtHighWatermark =
cluster.controllers().get(3002).raftManager().client().highWatermark().getAsLong();
+ TestUtils.waitForCondition(() ->
+
cluster.controllers().get(3002).raftManager().client().highWatermark().getAsLong()
> removedAtHighWatermark,
+ 30_000, 100, () -> "High watermark is not advanced in
30000ms"
+ );
+
+ // 3002 does not join the voter set after high watermark
advance
+ Map<Integer, Uuid> voters = findVoterDirs(admin);
+ assertEquals(Set.of(3000, 3001), voters.keySet());
+ for (int replicaId : new int[] {3000, 3001}) {
+
assertEquals(nodes.controllerNodes().get(replicaId).metadataDirectoryId(),
voters.get(replicaId));
+ }
+ }
Review Comment:
After the final verification, could we add one more test to verify that
under this situation, the `manual add voter via admin client` can still work?
something like:
```
// manual add 3002 voter
admin.addRaftVoter(3002, .....)
// verify it's joined successfully
TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
Map<Integer, Uuid> voters = findVoterDirs(admin);
assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
...
});
```
WDYT?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]