aloknnikhil commented on a change in pull request #9996:
URL: https://github.com/apache/kafka/pull/9996#discussion_r568248443



##########
File path: clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
##########
@@ -98,7 +98,8 @@
     UPDATE_FEATURES(ApiMessageType.UPDATE_FEATURES, false, true),
     ENVELOPE(ApiMessageType.ENVELOPE, true, RecordBatch.MAGIC_VALUE_V0, false, 
true),
     FETCH_SNAPSHOT(ApiMessageType.FETCH_SNAPSHOT, false, 
RecordBatch.MAGIC_VALUE_V0, false, true),
-    DESCRIBE_CLUSTER(ApiMessageType.DESCRIBE_CLUSTER);
+    DESCRIBE_CLUSTER(ApiMessageType.DESCRIBE_CLUSTER),
+    DECOMMISSION_BROKER(ApiMessageType.DECOMMISSION_BROKER, false, 
RecordBatch.MAGIC_VALUE_V0, true, true);

Review comment:
       Ah. Right. I'm curious, then, what would happen if we send a 
decommission request through the broker being decommissioned? Wouldn't the 
broker shutdown (as a result of being unregistered) before responding to the 
AdminClient call?

##########
File path: clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
##########
@@ -123,6 +124,9 @@
     /** indicates whether the API is enabled for forwarding **/
     public final boolean forwardable;
 
+    /** indicates if this API is exclusive to the KIP-500 mode **/
+    public final boolean isKip500OnlyApi;

Review comment:
       @hachikuji I am not entirely sure we need this additional flag but I 
don't think there's a cleaner way to mark certain APIs as being available only 
under KIP-500

##########
File path: clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
##########
@@ -123,6 +124,9 @@
     /** indicates whether the API is enabled for forwarding **/
     public final boolean forwardable;
 
+    /** indicates if this API is exclusive to the KIP-500 mode **/
+    public final boolean isKip500OnlyApi;

Review comment:
       Spoke offline. Will mark the API as `controllerOnly` for now to not 
advertise as part of the supported BrokerAPIs. We will allow the broker to 
process requests for this API once the KIP-500 controller integration is 
complete

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
##########
@@ -1485,6 +1485,43 @@ default DescribeFeaturesResult describeFeatures() {
      */
     UpdateFeaturesResult updateFeatures(Map<String, FeatureUpdate> 
featureUpdates, UpdateFeaturesOptions options);
 
+    /**
+     * Unregisters a broker.
+     * <p>
+     * This operation is supported only on self-managed Kafka clusters.

Review comment:
       Done.

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
##########
@@ -1485,6 +1485,43 @@ default DescribeFeaturesResult describeFeatures() {
      */
     UpdateFeaturesResult updateFeatures(Map<String, FeatureUpdate> 
featureUpdates, UpdateFeaturesOptions options);
 
+    /**
+     * Unregisters a broker.

Review comment:
       Right. I guess at some point the comment and the name of the API seem to 
have diverged. Updated the comment to reflect the use of the API

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -4605,6 +4608,47 @@ void handleFailure(Throwable throwable) {
         return new UpdateFeaturesResult(new HashMap<>(updateFutures));
     }
 
+    @Override
+    public DecommissionBrokerResult decommissionBroker(int brokerId, 
DecommissionBrokerOptions options) {
+        final KafkaFutureImpl<Void> future = new KafkaFutureImpl<>();
+        final long now = time.milliseconds();
+        final Call call = new Call("decommissionBroker", calcDeadlineMs(now, 
options.timeoutMs()),
+            new LeastLoadedNodeProvider()) {
+
+            @Override
+            DecommissionBrokerRequest.Builder createRequest(int timeoutMs) {
+                DecommissionBrokerRequestData data =
+                    new DecommissionBrokerRequestData().setBrokerId(brokerId);
+                return new DecommissionBrokerRequest.Builder(data);
+            }
+
+            @Override
+            void handleResponse(AbstractResponse abstractResponse) {
+                final DecommissionBrokerResponse response =
+                    (DecommissionBrokerResponse) abstractResponse;
+                Errors error = Errors.forCode(response.data().errorCode());
+                switch (error) {
+                    case NONE:
+                        future.complete(null);
+                        break;
+                    case NOT_CONTROLLER:
+                        handleNotControllerError(error);

Review comment:
       Hmm. Good point. We don't need a special-case for the REQUEST_TIMED_OUT 
then. Will update the test and case here

##########
File path: clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
##########
@@ -101,7 +101,10 @@
     DESCRIBE_CLUSTER(ApiMessageType.DESCRIBE_CLUSTER),
     DESCRIBE_PRODUCERS(ApiMessageType.DESCRIBE_PRODUCERS),
     BROKER_REGISTRATION(ApiMessageType.BROKER_REGISTRATION, true, 
RecordBatch.MAGIC_VALUE_V0, false, true),
-    BROKER_HEARTBEAT(ApiMessageType.BROKER_HEARTBEAT, true, 
RecordBatch.MAGIC_VALUE_V0, false, true);
+    BROKER_HEARTBEAT(ApiMessageType.BROKER_HEARTBEAT, true, 
RecordBatch.MAGIC_VALUE_V0, false, true),
+
+    // Once we have the controller integration for supporting broker 
decommissioning, we will support forwarding from the broker

Review comment:
       Ack.

##########
File path: 
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##########
@@ -5169,6 +5171,72 @@ public void testShouldRefreshCoordinator() {
         
assertTrue(ConsumerGroupOperationContext.shouldRefreshCoordinator(errorCounts));
     }
 
+    @Test
+    public void testDecommissionBrokerSuccess() throws InterruptedException, 
ExecutionException {
+        int decommissionedBrokerNode = 1;
+        try (AdminClientUnitTestEnv env = mockClientEnv()) {
+            env.kafkaClient().setNodeApiVersions(
+                    NodeApiVersions.create(ApiKeys.DECOMMISSION_BROKER.id, 
(short) 0, (short) 0));
+            
env.kafkaClient().prepareResponse(prepareDecommissionBrokerResponse(Errors.NONE,
 0));
+
+            DecommissionBrokerResult result = 
env.adminClient().decommissionBroker(decommissionedBrokerNode);
+
+            // Validate response
+            assertNotNull(result.all());
+            result.all().get();
+        }
+    }
+
+    @Test
+    public void testDecommissionBrokerFailure() {
+        int decommissionedBrokerNode = 1;
+        try (AdminClientUnitTestEnv env = mockClientEnv()) {
+            env.kafkaClient().setNodeApiVersions(
+                    NodeApiVersions.create(ApiKeys.DECOMMISSION_BROKER.id, 
(short) 0, (short) 0));
+            
env.kafkaClient().prepareResponse(prepareDecommissionBrokerResponse(Errors.BROKER_NOT_AVAILABLE,
 0));

Review comment:
       Makes sense. Updated

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -235,6 +235,11 @@ class KafkaApis(val requestChannel: RequestChannel,
         case ApiKeys.FETCH_SNAPSHOT => requestHelper.closeConnection(request, 
util.Collections.emptyMap())
         case ApiKeys.BROKER_REGISTRATION => 
requestHelper.closeConnection(request, util.Collections.emptyMap())
         case ApiKeys.BROKER_HEARTBEAT => 
requestHelper.closeConnection(request, util.Collections.emptyMap())
+
+        // We need to fix this to forward the request to the active controller.

Review comment:
       https://issues.apache.org/jira/browse/KAFKA-12275

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -4605,6 +4609,56 @@ void handleFailure(Throwable throwable) {
         return new UpdateFeaturesResult(new HashMap<>(updateFutures));
     }
 
+    private Call 
getDecommissionBrokerCall(DecommissionBrokerOperationContext<Void> context) {
+        return new Call("decommissionBroker", context.deadline(),
+                new LeastLoadedNodeProvider()) {
+
+            @Override
+            DecommissionBrokerRequest.Builder createRequest(int timeoutMs) {
+                DecommissionBrokerRequestData data =
+                        new 
DecommissionBrokerRequestData().setBrokerId(context.brokerId());
+                return new DecommissionBrokerRequest.Builder(data);
+            }
+
+            @Override
+            void handleResponse(AbstractResponse abstractResponse) {
+                final DecommissionBrokerResponse response =
+                        (DecommissionBrokerResponse) abstractResponse;
+                Errors error = Errors.forCode(response.data().errorCode());
+                switch (error) {
+                    case NONE:
+                        context.future().complete(null);
+                        break;
+                    case REQUEST_TIMED_OUT:
+                        Call nextCall = getDecommissionBrokerCall(context);

Review comment:
       Ah. That's much easier :) 

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -4605,6 +4609,56 @@ void handleFailure(Throwable throwable) {
         return new UpdateFeaturesResult(new HashMap<>(updateFutures));
     }
 
+    private Call 
getDecommissionBrokerCall(DecommissionBrokerOperationContext<Void> context) {
+        return new Call("decommissionBroker", context.deadline(),
+                new LeastLoadedNodeProvider()) {
+
+            @Override
+            DecommissionBrokerRequest.Builder createRequest(int timeoutMs) {
+                DecommissionBrokerRequestData data =
+                        new 
DecommissionBrokerRequestData().setBrokerId(context.brokerId());
+                return new DecommissionBrokerRequest.Builder(data);
+            }
+
+            @Override
+            void handleResponse(AbstractResponse abstractResponse) {
+                final DecommissionBrokerResponse response =
+                        (DecommissionBrokerResponse) abstractResponse;
+                Errors error = Errors.forCode(response.data().errorCode());
+                switch (error) {
+                    case NONE:
+                        context.future().complete(null);
+                        break;
+                    case REQUEST_TIMED_OUT:
+                        Call nextCall = getDecommissionBrokerCall(context);
+                        nextCall.tries = super.tries + 1;
+                        nextCall.nextAllowedTryMs = 
calculateNextAllowedRetryMs();
+                        runnable.call(nextCall, time.milliseconds());
+                        break;
+                    default:
+                        
context.future().completeExceptionally(error.exception());

Review comment:
       Ack.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to