chia7712 commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r501489060



##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -4335,6 +4343,150 @@ void handleFailure(Throwable throwable) {
                 .hi(password, salt, iterations);
     }
 
+    public DescribeFeaturesResult describeFeatures(final 
DescribeFeaturesOptions options) {
+        final KafkaFutureImpl<FeatureMetadata> future = new 
KafkaFutureImpl<>();
+        final long now = time.milliseconds();
+        final NodeProvider provider =
+            options.sendRequestToController() ? new ControllerNodeProvider() : 
new LeastLoadedNodeProvider();
+
+        final Call call = new Call(
+            "describeFeatures", calcDeadlineMs(now, options.timeoutMs()), 
provider) {
+
+            private FeatureMetadata createFeatureMetadata(final 
ApiVersionsResponse response) {
+                final Map<String, FinalizedVersionRange> finalizedFeatures = 
new HashMap<>();
+                for (final FinalizedFeatureKey key : 
response.data().finalizedFeatures().valuesSet()) {
+                    finalizedFeatures.put(key.name(), new 
FinalizedVersionRange(key.minVersionLevel(), key.maxVersionLevel()));
+                }
+
+                Optional<Long> finalizedFeaturesEpoch;
+                if (response.data().finalizedFeaturesEpoch() >= 0L) {
+                    finalizedFeaturesEpoch = 
Optional.of(response.data().finalizedFeaturesEpoch());
+                } else {
+                    finalizedFeaturesEpoch = Optional.empty();
+                }
+
+                final Map<String, SupportedVersionRange> supportedFeatures = 
new HashMap<>();
+                for (final SupportedFeatureKey key : 
response.data().supportedFeatures().valuesSet()) {
+                    supportedFeatures.put(key.name(), new 
SupportedVersionRange(key.minVersion(), key.maxVersion()));
+                }
+
+                return new FeatureMetadata(finalizedFeatures, 
finalizedFeaturesEpoch, supportedFeatures);
+            }
+
+            @Override
+            ApiVersionsRequest.Builder createRequest(int timeoutMs) {
+                return new ApiVersionsRequest.Builder();
+            }
+
+            @Override
+            void handleResponse(AbstractResponse response) {
+                final ApiVersionsResponse apiVersionsResponse = 
(ApiVersionsResponse) response;
+                if (apiVersionsResponse.data.errorCode() == 
Errors.NONE.code()) {
+                    
future.complete(createFeatureMetadata(apiVersionsResponse));
+                } else if (options.sendRequestToController() &&
+                           apiVersionsResponse.data.errorCode() == 
Errors.NOT_CONTROLLER.code()) {
+                    handleNotControllerError(Errors.NOT_CONTROLLER);
+                } else {
+                    
future.completeExceptionally(Errors.forCode(apiVersionsResponse.data.errorCode()).exception());
+                }
+            }
+
+            @Override
+            void handleFailure(Throwable throwable) {
+                completeAllExceptionally(Collections.singletonList(future), 
throwable);
+            }
+        };
+
+        runnable.call(call, now);
+        return new DescribeFeaturesResult(future);
+    }
+
+    @Override
+    public UpdateFeaturesResult updateFeatures(final Map<String, 
FeatureUpdate> featureUpdates,
+                                               final UpdateFeaturesOptions 
options) {
+        if (featureUpdates.isEmpty()) {
+            throw new IllegalArgumentException("Feature updates can not be 
null or empty.");
+        }
+
+        final Map<String, KafkaFutureImpl<Void>> updateFutures = new 
HashMap<>();
+        for (final Map.Entry<String, FeatureUpdate> entry : 
featureUpdates.entrySet()) {
+            updateFutures.put(entry.getKey(), new KafkaFutureImpl<>());
+        }
+
+        final long now = time.milliseconds();
+        final Call call = new Call("updateFeatures", calcDeadlineMs(now, 
options.timeoutMs()),
+            new ControllerNodeProvider()) {
+
+            @Override
+            UpdateFeaturesRequest.Builder createRequest(int timeoutMs) {
+                final UpdateFeaturesRequestData.FeatureUpdateKeyCollection 
featureUpdatesRequestData
+                    = new 
UpdateFeaturesRequestData.FeatureUpdateKeyCollection();
+                for (Map.Entry<String, FeatureUpdate> entry : 
featureUpdates.entrySet()) {
+                    final String feature = entry.getKey();
+                    final FeatureUpdate update = entry.getValue();
+                    if (feature.trim().isEmpty()) {
+                        throw new IllegalArgumentException("Provided feature 
can not be null or empty.");
+                    }
+
+                    final UpdateFeaturesRequestData.FeatureUpdateKey 
requestItem =
+                        new UpdateFeaturesRequestData.FeatureUpdateKey();
+                    requestItem.setFeature(feature);
+                    requestItem.setMaxVersionLevel(update.maxVersionLevel());
+                    requestItem.setAllowDowngrade(update.allowDowngrade());
+                    featureUpdatesRequestData.add(requestItem);
+                }
+                return new UpdateFeaturesRequest.Builder(
+                    new UpdateFeaturesRequestData()
+                        .setTimeoutMs(timeoutMs)
+                        .setFeatureUpdates(featureUpdatesRequestData));
+            }
+
+            @Override
+            void handleResponse(AbstractResponse abstractResponse) {
+                final UpdateFeaturesResponse response =
+                    (UpdateFeaturesResponse) abstractResponse;
+
+                Errors topLevelError = 
Errors.forCode(response.data().errorCode());
+                switch (topLevelError) {
+                    case NONE:
+                        for (final UpdatableFeatureResult result : 
response.data().results()) {
+                            final KafkaFutureImpl<Void> future = 
updateFutures.get(result.feature());
+                            if (future == null) {
+                                log.warn("Server response mentioned unknown 
feature {}", result.feature());
+                            } else {
+                                final Errors error = 
Errors.forCode(result.errorCode());
+                                if (error == Errors.NONE) {
+                                    future.complete(null);
+                                } else {
+                                    
future.completeExceptionally(error.exception(result.errorMessage()));
+                                }
+                            }
+                        }
+                        // The server should send back a response for every 
feature, but we do a sanity check anyway.
+                        
completeUnrealizedFutures(updateFutures.entrySet().stream(),
+                            feature -> "The controller response did not 
contain a result for feature " + feature);
+                        break;
+                    case NOT_CONTROLLER:
+                        handleNotControllerError(topLevelError);
+                        break;
+                    default:
+                        for (final Map.Entry<String, KafkaFutureImpl<Void>> 
entry : updateFutures.entrySet()) {
+                            
entry.getValue().completeExceptionally(topLevelError.exception());

Review comment:
       the top-level error message is not propagated.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to