ahuang98 commented on code in PR #22191:
URL: https://github.com/apache/kafka/pull/22191#discussion_r3286013338


##########
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java:
##########
@@ -9933,97 +9937,147 @@ public void 
testDescribeReplicaLogDirsWithNonExistReplica() throws Exception {
         }
     }
 
+    private static final int UNREGISTER_NODE_ID = 1;
+
+    private static final Function<Errors, AbstractResponse> 
BROKER_RESPONSE_FACTORY =
+        error -> new UnregisterBrokerResponse(new 
UnregisterBrokerResponseData()
+            .setErrorCode(error.code())
+            .setErrorMessage(error.message()));
+
+    private static final Function<Errors, AbstractResponse> 
CONTROLLER_RESPONSE_FACTORY =
+        error -> new UnregisterControllerResponse(new 
UnregisterControllerResponseData()
+            .setErrorCode(error.code())
+            .setErrorMessage(error.message()));
+
+    private static final Function<Admin, KafkaFuture<Void>> 
UNREGISTER_BROKER_CALL =
+        admin -> admin.unregisterBroker(UNREGISTER_NODE_ID).all();
+
+    private static final Function<Admin, KafkaFuture<Void>> 
UNREGISTER_CONTROLLER_CALL =
+        admin -> admin.unregisterController(UNREGISTER_NODE_ID).all();
+
+    private void runUnregisterScenario(
+        AdminClientUnitTestEnv env,
+        ApiKeys apiKey,
+        Function<Errors, AbstractResponse> responseFactory,
+        Function<Admin, KafkaFuture<Void>> adminCall,
+        List<Errors> responsesToPrepare,
+        Class<? extends Throwable> expectedException
+    ) throws ExecutionException, InterruptedException {

Review Comment:
   this added abstraction feels a bit heavy - I see that there are similarities 
between unregister_broker and unregister_controller but imo this makes figuring 
out what the tests are doing a little too opaque



##########
core/src/test/scala/unit/kafka/server/ControllerRegistrationManagerTest.scala:
##########
@@ -103,7 +103,8 @@ class ControllerRegistrationManagerTest {
     prevImage: MetadataImage,
     manager: ControllerRegistrationManager,
     metadataVersion: MetadataVersion,
-    registrationModifier: RegisterControllerRecord => 
Option[RegisterControllerRecord]
+    registrationModifier: RegisterControllerRecord => 
Option[RegisterControllerRecord],
+    unregisterModifier: UnregisterControllerRecord => 
Option[UnregisterControllerRecord]

Review Comment:
   I know you're just extending the same semantics, but man these are so hard 
to reason about - can we add comments/examples?



##########
core/src/test/scala/unit/kafka/server/ControllerRegistrationManagerTest.scala:
##########
@@ -318,4 +336,86 @@ class ControllerRegistrationManagerTest {
       manager.close()
     }
   }
+
+  @Test
+  def testReRegistrationAfterUnregister(): Unit = {
+    val context = new RegistrationTestContext(configProperties)
+    val manager = newControllerRegistrationManager(context)
+    try {
+      context.controllerNodeProvider.node.set(controller1)
+      manager.start(context.mockChannelManager)
+
+      // initial state with controller registered in log
+      val image = doMetadataUpdate(MetadataImage.EMPTY,
+        manager,
+        MetadataVersion.IBP_3_7_IV0,
+        r => Some(r),
+        _ => None
+      )
+      assertTrue(registeredInLog(manager))
+
+      // Now unregister the controller via an UnregisterControllerRecord.
+      doMetadataUpdate(image,
+        manager,
+        MetadataVersion.IBP_3_7_IV0,
+        _ => None,
+        r => if (r.controllerId() == 1) Some(r) else None
+      )
+      assertFalse(registeredInLog(manager))
+
+      // The local manager should send a new registration RPC.
+      assertEquals((true, 0, 0), rpcStats(manager))
+    } finally {
+      manager.close()
+    }
+  }
+
+  @Test
+  def testReRegistrationAfterDifferentIncarnationId(): Unit = {

Review Comment:
   just confirming, is this new or existing behavior that we're just adding 
test coverage for?



##########
clients/src/testFixtures/java/org/apache/kafka/clients/admin/MockAdminClient.java:
##########
@@ -1366,6 +1366,17 @@ public UnregisterBrokerResult unregisterBroker(int 
brokerId, UnregisterBrokerOpt
         }
     }
 
+    @Override
+    public UnregisterControllerResult unregisterController(int controllerId, 
UnregisterControllerOptions options) {
+        if (usingRaftController) {

Review Comment:
   interesting, seems like we have more dead code to remove



##########
tools/src/main/java/org/apache/kafka/tools/ClusterTool.java:
##########
@@ -96,6 +98,11 @@ static void execute(String... args) throws Exception {
                 .action(store())
                 .required(true)
                 .help("The ID of the broker to unregister.");
+        unregisterControllerParser.addArgument("--controller-id", "-i")

Review Comment:
   thoughts on just using "--id" here given that the subcommand is pretty 
self-explanatory ("unregister-controller")?



##########
clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java:
##########
@@ -873,6 +878,23 @@ public void 
testUnregisterBrokerResponseWithUnknownServerError() {
         assertEquals(customerErrorMessage, response.data().errorMessage());
     }
 
+    @Test
+    public void testUnregisterControllerResponseWithUnknownServerError() {
+        UnregisterControllerRequest request = new 
UnregisterControllerRequest.Builder(
+            new UnregisterControllerRequestData()
+        ).build((short) 0);
+        String customerErrorMessage = "customer error message";

Review Comment:
   customer feels like an odd choice here? did you mean custom?



##########
tools/src/main/java/org/apache/kafka/tools/MetadataQuorumCommand.java:
##########
@@ -494,12 +503,56 @@ static void handleRemoveController(
             throw new TerseException("Failed to parse 
--controller-directory-id: " + e.getMessage());
         }
         if (!dryRun) {
-            admin.removeRaftVoter(controllerId, directoryId).
-                all().get();
+            removeRaftVoter(admin, controllerId, directoryId, unregister);
         }
         System.out.printf("%s KRaft controller %d with directory id %s%n",
             dryRun ? "DRY RUN of removing " : "Removed ",
             controllerId,
             directoryId);
+        if (unregister) {
+            if (!dryRun) {
+                unregisterController(admin, controllerId);
+            }
+            System.out.printf("%s KRaft controller %d%n",
+                dryRun ? "DRY RUN of unregistering " : "Unregistered ",
+                controllerId);
+        }
+    }
+
+    private static void removeRaftVoter(
+        Admin admin,
+        int controllerId,
+        Uuid directoryId,
+        boolean unregister
+    ) throws TerseException, ExecutionException, InterruptedException {
+        try {
+            admin.removeRaftVoter(controllerId, directoryId).all().get();
+        } catch (ExecutionException e) {
+            Throwable cause = e.getCause();
+            if (unregister && (cause instanceof UnsupportedVersionException ||
+                cause instanceof VoterNotFoundException)) {
+                throw new TerseException("Failed to remove KRaft voter " + 
controllerId
+                    + ": " + cause.getMessage()
+                    + ". To unregister the controller from the cluster, run "
+                    + "`kafka-cluster.sh unregister-controller --controller-id 
"
+                    + controllerId + "`.");
+            }
+            throw e;
+        }
+    }
+
+    private static void unregisterController(Admin admin, int controllerId)
+            throws TerseException, InterruptedException {
+        try {
+            admin.unregisterController(controllerId).all().get();
+        } catch (ExecutionException e) {
+            Throwable cause = e.getCause();
+            throw new TerseException("Removed KRaft voter " + controllerId
+                + " but failed to unregister it: "

Review Comment:
   this doesn't seem like the right place to log a successful kraft voter 
removal



##########
tools/src/main/java/org/apache/kafka/tools/ClusterTool.java:
##########
@@ -169,6 +182,20 @@ static void unregisterCommand(PrintStream stream, Admin 
adminClient, int id) thr
         }
     }
 
+    static void unregisterControllerCommand(PrintStream stream, Admin 
adminClient, int id) throws Exception {
+        try {
+            adminClient.unregisterController(id).all().get();
+            stream.println("Controller " + id + " is no longer registered.");
+        } catch (ExecutionException ee) {
+            Throwable cause = ee.getCause();
+            if (cause instanceof UnsupportedVersionException) {
+                stream.println("The target cluster does not support the 
controller unregistration API.");

Review Comment:
   is the original exception message not descriptive enough?



##########
tools/src/test/java/org/apache/kafka/tools/MetadataQuorumCommandUnitTest.java:
##########
@@ -51,6 +61,22 @@ public void testRemoveControllerDryRun() {
             "Failed to find expected output in stdout: " + outputs);
     }
 
+    @Test
+    public void testRemoveControllerDryRunWithUnregister() {
+        List<String> outputs = List.of(
+            ToolsTestUtils.captureStandardOut(() ->
+                assertEquals(0, 
MetadataQuorumCommand.mainNoExit("--bootstrap-server", "localhost:9092",
+                    "remove-controller",
+                    "--controller-id", "2",
+                    "--controller-directory-id", "_KWDkTahTVaiVVVTaugNew",
+                    "--dry-run",
+                    "--unregister"))).split("\n"));
+        assertTrue(outputs.contains("DRY RUN of removing  KRaft controller 2 
with directory id _KWDkTahTVaiVVVTaugNew"),
+            "Failed to find expected output in stdout: " + outputs);
+        assertTrue(outputs.contains("DRY RUN of unregistering  KRaft 
controller 2"),

Review Comment:
   nit, looks like we are printing extra whitespace



##########
tools/src/main/java/org/apache/kafka/tools/MetadataQuorumCommand.java:
##########
@@ -494,12 +503,56 @@ static void handleRemoveController(
             throw new TerseException("Failed to parse 
--controller-directory-id: " + e.getMessage());
         }
         if (!dryRun) {
-            admin.removeRaftVoter(controllerId, directoryId).
-                all().get();
+            removeRaftVoter(admin, controllerId, directoryId, unregister);
         }
         System.out.printf("%s KRaft controller %d with directory id %s%n",
             dryRun ? "DRY RUN of removing " : "Removed ",
             controllerId,
             directoryId);
+        if (unregister) {
+            if (!dryRun) {
+                unregisterController(admin, controllerId);
+            }
+            System.out.printf("%s KRaft controller %d%n",
+                dryRun ? "DRY RUN of unregistering " : "Unregistered ",
+                controllerId);
+        }
+    }
+
+    private static void removeRaftVoter(
+        Admin admin,
+        int controllerId,
+        Uuid directoryId,
+        boolean unregister
+    ) throws TerseException, ExecutionException, InterruptedException {
+        try {
+            admin.removeRaftVoter(controllerId, directoryId).all().get();
+        } catch (ExecutionException e) {
+            Throwable cause = e.getCause();
+            if (unregister && (cause instanceof UnsupportedVersionException ||
+                cause instanceof VoterNotFoundException)) {
+                throw new TerseException("Failed to remove KRaft voter " + 
controllerId
+                    + ": " + cause.getMessage()
+                    + ". To unregister the controller from the cluster, run "
+                    + "`kafka-cluster.sh unregister-controller --controller-id 
"
+                    + controllerId + "`.");

Review Comment:
   hm, should we include the original exception so the client knows why 
removeRaftVoter failed? 
   
   also, if the removal fails due to UnsupportedVersionException then wouldn't 
unregisterController also be unsupported?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to