chia7712 commented on code in PR #19589:
URL: https://github.com/apache/kafka/pull/19589#discussion_r2267506636


##########
test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java:
##########
@@ -452,15 +480,50 @@ private void formatNode(
             if 
(nodes.bootstrapMetadata().featureLevel(KRaftVersion.FEATURE_NAME) > 0) {
                 StringBuilder dynamicVotersBuilder = new StringBuilder();
                 String prefix = "";
-                for (TestKitNode controllerNode : 
nodes.controllerNodes().values()) {
-                    int port = socketFactoryManager.
-                        getOrCreatePortForListener(controllerNode.id(), 
controllerListenerName);
-                    dynamicVotersBuilder.append(prefix);
-                    prefix = ",";
-                    
dynamicVotersBuilder.append(String.format("%d@localhost:%d:%s",
-                        controllerNode.id(), port, 
controllerNode.metadataDirectoryId()));
+                if (standalone) {
+                    if (nodeId == TestKitDefaults.CONTROLLER_ID_OFFSET) {
+                        final var controllerNode = 
nodes.controllerNodes().get(nodeId);
+                        dynamicVotersBuilder.append(
+                            String.format(
+                                "%d@localhost:%d:%s",
+                                controllerNode.id(),
+                                socketFactoryManager.
+                                    
getOrCreatePortForListener(controllerNode.id(), controllerListenerName),
+                                controllerNode.metadataDirectoryId()
+                            )
+                        );
+                        
formatter.setInitialControllers(DynamicVoters.parse(dynamicVotersBuilder.toString()));
+                    } else {
+                        formatter.setNoInitialControllersFlag(true);
+                    }
+                } else if (initialVoterSet.isPresent()) {
+                    for (final var controllerNode : 
initialVoterSet.get().entrySet()) {
+                        final var voterId = controllerNode.getKey();
+                        final var voterDirectoryid = controllerNode.getValue();

Review Comment:
   `voterDirectoryid` -> `voterDirectoryId`



##########
raft/src/main/java/org/apache/kafka/raft/RaftUtil.java:
##########
@@ -524,14 +526,16 @@ public static AddRaftVoterRequestData addVoterRequest(
         String clusterId,
         int timeoutMs,
         ReplicaKey voter,
-        Endpoints listeners
+        Endpoints listeners,
+        boolean ackWhenCommitted
     ) {
         return new AddRaftVoterRequestData()
             .setClusterId(clusterId)
             .setTimeoutMs(timeoutMs)

Review Comment:
   I didn't see any usage of the `timeout`. Is that expected?



##########
core/src/test/java/kafka/server/ReconfigurableQuorumIntegrationTest.java:
##########
@@ -164,4 +169,69 @@ public void testRemoveAndAddSameController() throws 
Exception {
             }
         }
     }
+
+    @Test
+    public void testControllersAutoJoinStandaloneVoter() throws Exception {
+        final var nodes = new TestKitNodes.Builder().
+            setNumBrokerNodes(1).
+            setNumControllerNodes(3).
+            setFeature(KRaftVersion.FEATURE_NAME, 
KRaftVersion.KRAFT_VERSION_1.featureLevel()).
+            build();
+        try (KafkaClusterTestKit cluster = new 
KafkaClusterTestKit.Builder(nodes).
+            setConfigProp(QuorumConfig.QUORUM_AUTO_JOIN_ENABLE_CONFIG, true).
+            setStandalone(true).
+            build()
+        ) {
+            cluster.format();
+            cluster.startup();
+            try (Admin admin = Admin.create(cluster.clientProperties())) {
+                TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
+                    Map<Integer, Uuid> voters = findVoterDirs(admin);
+                    assertEquals(new HashSet<>(List.of(3000, 3001, 3002)), 
voters.keySet());
+                    for (int replicaId : new int[] {3000, 3001, 3002}) {
+                        
assertEquals(nodes.controllerNodes().get(replicaId).metadataDirectoryId(), 
voters.get(replicaId));
+                    }
+                });
+            }
+        }
+    }
+
+    @Test
+    public void testNewVoterAutoRemovesAndAdds() throws Exception {
+        final var nodes = new TestKitNodes.Builder().
+            setNumBrokerNodes(1).
+            setNumControllerNodes(3).
+            setFeature(KRaftVersion.FEATURE_NAME, 
KRaftVersion.KRAFT_VERSION_1.featureLevel()).
+            build();
+
+        // Configure the initial voters with one voter having a different 
directory ID.
+        // This simulates the case where the controller failed and is brought 
back up with a different directory ID.
+        final Map<Integer, Uuid> initialVoters = new HashMap<>();
+        final var oldDirectoryId = Uuid.randomUuid();
+        for (final var controllerNode : nodes.controllerNodes().values()) {
+            initialVoters.put(
+                controllerNode.id(),
+                controllerNode.id() == TestKitDefaults.CONTROLLER_ID_OFFSET ?
+                    oldDirectoryId : controllerNode.metadataDirectoryId()
+            );
+        }
+
+        try (KafkaClusterTestKit cluster = new 
KafkaClusterTestKit.Builder(nodes).
+            setConfigProp(QuorumConfig.QUORUM_AUTO_JOIN_ENABLE_CONFIG, true).
+            setInitialVoterSet(initialVoters).
+            build()
+        ) {
+            cluster.format();
+            cluster.startup();
+            try (Admin admin = Admin.create(cluster.clientProperties())) {
+                TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
+                    Map<Integer, Uuid> voters = findVoterDirs(admin);
+                    assertEquals(new HashSet<>(List.of(3000, 3001, 3002)), 
voters.keySet());

Review Comment:
   ```java
   Set.of(3000, 3001, 3002)
   ```



##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -3271,13 +3317,55 @@ private long pollFollowerAsVoter(FollowerState state, 
long currentTimeMs) {
             backoffMs,
             Math.min(
                 state.remainingFetchTimeMs(currentTimeMs),
-                state.remainingUpdateVoterPeriodMs(currentTimeMs)
+                state.remainingUpdateVoterSetPeriodMs(currentTimeMs)
             )
         );
     }
 
+    private boolean shouldSendAddOrRemoveVoterRequest(FollowerState state, 
long currentTimeMs) {
+        /* When the cluster supports reconfiguration, only replicas that can 
become a voter
+         * and are configured to auto join should attempt to automatically 
join the voter
+         * set for the configured topic partition.
+         */
+        return partitionState.lastKraftVersion().isReconfigSupported() && 
canBecomeVoter &&
+            quorumConfig.autoJoin() && 
state.hasUpdateVoterSetPeriodExpired(currentTimeMs);
+    }
+
     private long pollFollowerAsObserver(FollowerState state, long 
currentTimeMs) {
-        return maybeSendFetchToBestNode(state, currentTimeMs);
+        GracefulShutdown shutdown = this.shutdown.get();
+        final long backoffMs;
+        if (shutdown != null) {
+            // If we are an observer, then we can shutdown immediately. We 
want to
+            // skip potentially sending any add or remove voter RPCs.
+            backoffMs = 0;
+        } else if (shouldSendAddOrRemoveVoterRequest(state, currentTimeMs)) {
+            final var localReplicaKey = quorum.localReplicaKeyOrThrow();
+            final var voters = partitionState.lastVoterSet();
+            final RequestSendResult sendResult;
+            if (voters.voterIds().contains(localReplicaKey.id())) {
+                /* Replica id is in the voter set but replica is not voter. 
Remove old voter.
+                 * Local replica is not in the voter set because the replica 
is an observer.

Review Comment:
   Perhaps we could remind that, in this path, the directory ID must be 
different.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to