hachikuji commented on a change in pull request #9996:
URL: https://github.com/apache/kafka/pull/9996#discussion_r568261855



##########
File path: clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
##########
@@ -98,7 +98,8 @@
     UPDATE_FEATURES(ApiMessageType.UPDATE_FEATURES, false, true),
     ENVELOPE(ApiMessageType.ENVELOPE, true, RecordBatch.MAGIC_VALUE_V0, false, 
true),
     FETCH_SNAPSHOT(ApiMessageType.FETCH_SNAPSHOT, false, 
RecordBatch.MAGIC_VALUE_V0, false, true),
-    DESCRIBE_CLUSTER(ApiMessageType.DESCRIBE_CLUSTER);
+    DESCRIBE_CLUSTER(ApiMessageType.DESCRIBE_CLUSTER),
+    DECOMMISSION_BROKER(ApiMessageType.DECOMMISSION_BROKER, false, 
RecordBatch.MAGIC_VALUE_V0, true, true);

Review comment:
       I am not sure. It seems like the decommissioned broker should wait 
around long enough to give the controller a chance to move replicas elsewhere.

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
##########
@@ -1485,6 +1485,43 @@ default DescribeFeaturesResult describeFeatures() {
      */
     UpdateFeaturesResult updateFeatures(Map<String, FeatureUpdate> 
featureUpdates, UpdateFeaturesOptions options);
 
+    /**
+     * Unregisters a broker.

Review comment:
       What does this mean exactly? The verb "decommission" sounds like we are 
permanently removing the broker and reassigning any partitions that were 
assigned to it. 

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
##########
@@ -1485,6 +1485,43 @@ default DescribeFeaturesResult describeFeatures() {
      */
     UpdateFeaturesResult updateFeatures(Map<String, FeatureUpdate> 
featureUpdates, UpdateFeaturesOptions options);
 
+    /**
+     * Unregisters a broker.
+     * <p>
+     * This operation is supported only on self-managed Kafka clusters.

Review comment:
       Let's emphasize that this means brokers which do not use Zookeeper.
   
   > This operation is supported only on self-managed Kafka clusters (i.e. 
brokers which do not rely on Zookeeper).
   
   Same thing for the other API.

##########
File path: clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
##########
@@ -101,7 +101,10 @@
     DESCRIBE_CLUSTER(ApiMessageType.DESCRIBE_CLUSTER),
     DESCRIBE_PRODUCERS(ApiMessageType.DESCRIBE_PRODUCERS),
     BROKER_REGISTRATION(ApiMessageType.BROKER_REGISTRATION, true, 
RecordBatch.MAGIC_VALUE_V0, false, true),
-    BROKER_HEARTBEAT(ApiMessageType.BROKER_HEARTBEAT, true, 
RecordBatch.MAGIC_VALUE_V0, false, true);
+    BROKER_HEARTBEAT(ApiMessageType.BROKER_HEARTBEAT, true, 
RecordBatch.MAGIC_VALUE_V0, false, true),
+
+    // Once we have the controller integration for supporting broker 
decommissioning, we will support forwarding from the broker

Review comment:
       Maybe emphasize that we have temporarily marked this as 
"controller-only" as a short-term workaround to avoid exposing it on zk-based 
brokers.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -4605,6 +4608,47 @@ void handleFailure(Throwable throwable) {
         return new UpdateFeaturesResult(new HashMap<>(updateFutures));
     }
 
+    @Override
+    public DecommissionBrokerResult decommissionBroker(int brokerId, 
DecommissionBrokerOptions options) {
+        final KafkaFutureImpl<Void> future = new KafkaFutureImpl<>();
+        final long now = time.milliseconds();
+        final Call call = new Call("decommissionBroker", calcDeadlineMs(now, 
options.timeoutMs()),
+            new LeastLoadedNodeProvider()) {
+
+            @Override
+            DecommissionBrokerRequest.Builder createRequest(int timeoutMs) {
+                DecommissionBrokerRequestData data =
+                    new DecommissionBrokerRequestData().setBrokerId(brokerId);
+                return new DecommissionBrokerRequest.Builder(data);
+            }
+
+            @Override
+            void handleResponse(AbstractResponse abstractResponse) {
+                final DecommissionBrokerResponse response =
+                    (DecommissionBrokerResponse) abstractResponse;
+                Errors error = Errors.forCode(response.data().errorCode());
+                switch (error) {
+                    case NONE:
+                        future.complete(null);
+                        break;
+                    case NOT_CONTROLLER:
+                        handleNotControllerError(error);

Review comment:
       Do we actually expect to get back `NOT_CONTROLLER`? I would have thought 
that this error would be swallowed by the broker when it forwards the request. 
In the current `ForwardingManager`, the broker will try to send the request to 
the controller. If the controller returns `NOT_CONTROLLER`, it will just retry. 
It will continue doing so until it ultimately times out. So maybe 
`REQUEST_TIMED_OUT` is the one we should be looking for?

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -235,6 +235,11 @@ class KafkaApis(val requestChannel: RequestChannel,
         case ApiKeys.FETCH_SNAPSHOT => requestHelper.closeConnection(request, 
util.Collections.emptyMap())
         case ApiKeys.BROKER_REGISTRATION => 
requestHelper.closeConnection(request, util.Collections.emptyMap())
         case ApiKeys.BROKER_HEARTBEAT => 
requestHelper.closeConnection(request, util.Collections.emptyMap())
+
+        // We need to fix this to forward the request to the active controller.

Review comment:
       We can move this comment to a jira.

##########
File path: 
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##########
@@ -5169,6 +5171,72 @@ public void testShouldRefreshCoordinator() {
         
assertTrue(ConsumerGroupOperationContext.shouldRefreshCoordinator(errorCounts));
     }
 
+    @Test
+    public void testDecommissionBrokerSuccess() throws InterruptedException, 
ExecutionException {
+        int decommissionedBrokerNode = 1;
+        try (AdminClientUnitTestEnv env = mockClientEnv()) {
+            env.kafkaClient().setNodeApiVersions(
+                    NodeApiVersions.create(ApiKeys.DECOMMISSION_BROKER.id, 
(short) 0, (short) 0));
+            
env.kafkaClient().prepareResponse(prepareDecommissionBrokerResponse(Errors.NONE,
 0));
+
+            DecommissionBrokerResult result = 
env.adminClient().decommissionBroker(decommissionedBrokerNode);
+
+            // Validate response
+            assertNotNull(result.all());
+            result.all().get();
+        }
+    }
+
+    @Test
+    public void testDecommissionBrokerFailure() {
+        int decommissionedBrokerNode = 1;
+        try (AdminClientUnitTestEnv env = mockClientEnv()) {
+            env.kafkaClient().setNodeApiVersions(
+                    NodeApiVersions.create(ApiKeys.DECOMMISSION_BROKER.id, 
(short) 0, (short) 0));
+            
env.kafkaClient().prepareResponse(prepareDecommissionBrokerResponse(Errors.BROKER_NOT_AVAILABLE,
 0));

Review comment:
       Hmm.. I would actually expect BROKER_NOT_AVAILABLE to be retriable. 
Maybe we should use an error code like `UNKNOWN_SERVER_ERROR`.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -4605,6 +4609,56 @@ void handleFailure(Throwable throwable) {
         return new UpdateFeaturesResult(new HashMap<>(updateFutures));
     }
 
+    private Call 
getDecommissionBrokerCall(DecommissionBrokerOperationContext<Void> context) {
+        return new Call("decommissionBroker", context.deadline(),
+                new LeastLoadedNodeProvider()) {
+
+            @Override
+            DecommissionBrokerRequest.Builder createRequest(int timeoutMs) {
+                DecommissionBrokerRequestData data =
+                        new 
DecommissionBrokerRequestData().setBrokerId(context.brokerId());
+                return new DecommissionBrokerRequest.Builder(data);
+            }
+
+            @Override
+            void handleResponse(AbstractResponse abstractResponse) {
+                final DecommissionBrokerResponse response =
+                        (DecommissionBrokerResponse) abstractResponse;
+                Errors error = Errors.forCode(response.data().errorCode());
+                switch (error) {
+                    case NONE:
+                        context.future().complete(null);
+                        break;
+                    case REQUEST_TIMED_OUT:
+                        Call nextCall = getDecommissionBrokerCall(context);

Review comment:
       I think you can just raise the exception? It will get caught and 
`Call.fail` will be called. This does all the `tries` bookkeeping and will 
resubmit it if the exception is retriable (which `TimeoutException` is).

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -4605,6 +4609,56 @@ void handleFailure(Throwable throwable) {
         return new UpdateFeaturesResult(new HashMap<>(updateFutures));
     }
 
+    private Call 
getDecommissionBrokerCall(DecommissionBrokerOperationContext<Void> context) {
+        return new Call("decommissionBroker", context.deadline(),
+                new LeastLoadedNodeProvider()) {
+
+            @Override
+            DecommissionBrokerRequest.Builder createRequest(int timeoutMs) {
+                DecommissionBrokerRequestData data =
+                        new 
DecommissionBrokerRequestData().setBrokerId(context.brokerId());
+                return new DecommissionBrokerRequest.Builder(data);
+            }
+
+            @Override
+            void handleResponse(AbstractResponse abstractResponse) {
+                final DecommissionBrokerResponse response =
+                        (DecommissionBrokerResponse) abstractResponse;
+                Errors error = Errors.forCode(response.data().errorCode());
+                switch (error) {
+                    case NONE:
+                        context.future().complete(null);
+                        break;
+                    case REQUEST_TIMED_OUT:
+                        Call nextCall = getDecommissionBrokerCall(context);
+                        nextCall.tries = super.tries + 1;
+                        nextCall.nextAllowedTryMs = 
calculateNextAllowedRetryMs();
+                        runnable.call(nextCall, time.milliseconds());
+                        break;
+                    default:
+                        
context.future().completeExceptionally(error.exception());

Review comment:
       Can we log an error message here which includes the request information 
and the unexpected error?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to