kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r463912157



##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -4052,6 +4058,128 @@ void handleFailure(Throwable throwable) {
         return new 
AlterClientQuotasResult(Collections.unmodifiableMap(futures));
     }
 
+    @Override
+    public DescribeFeaturesResult describeFeatures(final 
DescribeFeaturesOptions options) {
+        final KafkaFutureImpl<FeatureMetadata> future = new 
KafkaFutureImpl<>();
+        final long now = time.milliseconds();
+        final NodeProvider provider =
+            options.sendRequestToController() ? new ControllerNodeProvider() : 
new LeastLoadedNodeProvider();
+
+        Call call = new Call(
+            "describeFeatures", calcDeadlineMs(now, options.timeoutMs()), 
provider) {
+
+            @Override
+            ApiVersionsRequest.Builder createRequest(int timeoutMs) {
+                return new ApiVersionsRequest.Builder();
+            }
+
+            @Override
+            void handleResponse(AbstractResponse response) {
+                final ApiVersionsResponse apiVersionsResponse = 
(ApiVersionsResponse) response;
+                if (apiVersionsResponse.data.errorCode() == 
Errors.NONE.code()) {
+                    future.complete(
+                        new FeatureMetadata(
+                            apiVersionsResponse.finalizedFeatures(),
+                            apiVersionsResponse.finalizedFeaturesEpoch(),
+                            apiVersionsResponse.supportedFeatures()));
+                } else if (options.sendRequestToController() && 
apiVersionsResponse.data.errorCode() == Errors.NOT_CONTROLLER.code()) {
+                    handleNotControllerError(Errors.NOT_CONTROLLER);
+                } else {
+                    future.completeExceptionally(
+                        
Errors.forCode(apiVersionsResponse.data.errorCode()).exception());
+                }
+            }
+
+            @Override
+            void handleFailure(Throwable throwable) {
+                completeAllExceptionally(Collections.singletonList(future), 
throwable);
+            }
+        };
+
+        runnable.call(call, now);
+        return new DescribeFeaturesResult(future);
+    }
+
+    @Override
+    public UpdateFeaturesResult updateFeatures(
+        final Map<String, FeatureUpdate> featureUpdates, final 
UpdateFeaturesOptions options) {
+        if (featureUpdates == null || featureUpdates.isEmpty()) {
+            throw new IllegalArgumentException("Feature updates can not be 
null or empty.");
+        }
+        Objects.requireNonNull(options, "UpdateFeaturesOptions can not be 
null");
+
+        final Map<String, KafkaFutureImpl<Void>> updateFutures = new 
HashMap<>();
+        final UpdateFeaturesRequestData.FeatureUpdateKeyCollection 
featureUpdatesRequestData
+            = new UpdateFeaturesRequestData.FeatureUpdateKeyCollection();
+        for (Map.Entry<String, FeatureUpdate> entry : 
featureUpdates.entrySet()) {
+            final String feature = entry.getKey();
+            final FeatureUpdate update = entry.getValue();
+            if (feature.trim().isEmpty()) {
+                throw new IllegalArgumentException("Provided feature can not 
be null or empty.");
+            }
+
+            updateFutures.put(feature, new KafkaFutureImpl<>());
+            final UpdateFeaturesRequestData.FeatureUpdateKey requestItem =
+                new UpdateFeaturesRequestData.FeatureUpdateKey();
+            requestItem.setFeature(feature);
+            requestItem.setMaxVersionLevel(update.maxVersionLevel());
+            requestItem.setAllowDowngrade(update.allowDowngrade());
+            featureUpdatesRequestData.add(requestItem);
+        }
+        final UpdateFeaturesRequestData request = new 
UpdateFeaturesRequestData().setFeatureUpdates(featureUpdatesRequestData);
+
+        final long now = time.milliseconds();
+        final Call call = new Call("updateFeatures", calcDeadlineMs(now, 
options.timeoutMs()),
+            new ControllerNodeProvider()) {
+
+            @Override
+            UpdateFeaturesRequest.Builder createRequest(int timeoutMs) {
+                return new UpdateFeaturesRequest.Builder(request);
+            }
+
+            @Override
+            void handleResponse(AbstractResponse abstractResponse) {
+                final UpdateFeaturesResponse response =
+                    (UpdateFeaturesResponse) abstractResponse;
+
+                // Check for controller change.
+                for (UpdatableFeatureResult result : 
response.data().results()) {
+                    final Errors error = Errors.forCode(result.errorCode());
+                    if (error == Errors.NOT_CONTROLLER) {
+                        handleNotControllerError(error);
+                        throw error.exception();
+                    }
+                }
+
+                for (UpdatableFeatureResult result : 
response.data().results()) {
+                    final KafkaFutureImpl<Void> future = 
updateFutures.get(result.feature());
+                    if (future == null) {

Review comment:
       It does not overlap. This checks for unexpected responses for features 
that we never intended to update. `completeUnrealizedFutures` is for futures 
that we never got a response for from the server -- we need to complete such 
futures exceptionally.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to