chia7712 commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r501418496



##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -4335,6 +4343,150 @@ void handleFailure(Throwable throwable) {
                 .hi(password, salt, iterations);
     }
 
+    public DescribeFeaturesResult describeFeatures(final 
DescribeFeaturesOptions options) {
+        final KafkaFutureImpl<FeatureMetadata> future = new 
KafkaFutureImpl<>();
+        final long now = time.milliseconds();
+        final NodeProvider provider =
+            options.sendRequestToController() ? new ControllerNodeProvider() : 
new LeastLoadedNodeProvider();
+
+        final Call call = new Call(
+            "describeFeatures", calcDeadlineMs(now, options.timeoutMs()), 
provider) {
+
+            private FeatureMetadata createFeatureMetadata(final 
ApiVersionsResponse response) {
+                final Map<String, FinalizedVersionRange> finalizedFeatures = 
new HashMap<>();
+                for (final FinalizedFeatureKey key : 
response.data().finalizedFeatures().valuesSet()) {
+                    finalizedFeatures.put(key.name(), new 
FinalizedVersionRange(key.minVersionLevel(), key.maxVersionLevel()));
+                }
+
+                Optional<Long> finalizedFeaturesEpoch;
+                if (response.data().finalizedFeaturesEpoch() >= 0L) {
+                    finalizedFeaturesEpoch = 
Optional.of(response.data().finalizedFeaturesEpoch());
+                } else {
+                    finalizedFeaturesEpoch = Optional.empty();
+                }
+
+                final Map<String, SupportedVersionRange> supportedFeatures = 
new HashMap<>();
+                for (final SupportedFeatureKey key : 
response.data().supportedFeatures().valuesSet()) {
+                    supportedFeatures.put(key.name(), new 
SupportedVersionRange(key.minVersion(), key.maxVersion()));
+                }
+
+                return new FeatureMetadata(finalizedFeatures, 
finalizedFeaturesEpoch, supportedFeatures);
+            }
+
+            @Override
+            ApiVersionsRequest.Builder createRequest(int timeoutMs) {
+                return new ApiVersionsRequest.Builder();
+            }
+
+            @Override
+            void handleResponse(AbstractResponse response) {
+                final ApiVersionsResponse apiVersionsResponse = 
(ApiVersionsResponse) response;
+                if (apiVersionsResponse.data.errorCode() == 
Errors.NONE.code()) {
+                    
future.complete(createFeatureMetadata(apiVersionsResponse));
+                } else if (options.sendRequestToController() &&
+                           apiVersionsResponse.data.errorCode() == 
Errors.NOT_CONTROLLER.code()) {
+                    handleNotControllerError(Errors.NOT_CONTROLLER);
+                } else {
+                    
future.completeExceptionally(Errors.forCode(apiVersionsResponse.data.errorCode()).exception());
+                }
+            }
+
+            @Override
+            void handleFailure(Throwable throwable) {
+                completeAllExceptionally(Collections.singletonList(future), 
throwable);
+            }
+        };
+
+        runnable.call(call, now);
+        return new DescribeFeaturesResult(future);
+    }
+
+    @Override
+    public UpdateFeaturesResult updateFeatures(final Map<String, 
FeatureUpdate> featureUpdates,
+                                               final UpdateFeaturesOptions 
options) {
+        if (featureUpdates.isEmpty()) {
+            throw new IllegalArgumentException("Feature updates can not be 
null or empty.");
+        }
+
+        final Map<String, KafkaFutureImpl<Void>> updateFutures = new 
HashMap<>();
+        for (final Map.Entry<String, FeatureUpdate> entry : 
featureUpdates.entrySet()) {
+            updateFutures.put(entry.getKey(), new KafkaFutureImpl<>());
+        }
+
+        final long now = time.milliseconds();
+        final Call call = new Call("updateFeatures", calcDeadlineMs(now, 
options.timeoutMs()),
+            new ControllerNodeProvider()) {
+
+            @Override
+            UpdateFeaturesRequest.Builder createRequest(int timeoutMs) {
+                final UpdateFeaturesRequestData.FeatureUpdateKeyCollection 
featureUpdatesRequestData
+                    = new 
UpdateFeaturesRequestData.FeatureUpdateKeyCollection();
+                for (Map.Entry<String, FeatureUpdate> entry : 
featureUpdates.entrySet()) {
+                    final String feature = entry.getKey();
+                    final FeatureUpdate update = entry.getValue();
+                    if (feature.trim().isEmpty()) {

Review comment:
       the error message says it can't be null but there is no null check.
   for another, this check can happen early (when creating ```updateFutures```)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to