kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r501575432



##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -4335,6 +4343,150 @@ void handleFailure(Throwable throwable) {
                 .hi(password, salt, iterations);
     }
 
+    public DescribeFeaturesResult describeFeatures(final 
DescribeFeaturesOptions options) {
+        final KafkaFutureImpl<FeatureMetadata> future = new 
KafkaFutureImpl<>();
+        final long now = time.milliseconds();
+        final NodeProvider provider =
+            options.sendRequestToController() ? new ControllerNodeProvider() : 
new LeastLoadedNodeProvider();
+
+        final Call call = new Call(
+            "describeFeatures", calcDeadlineMs(now, options.timeoutMs()), 
provider) {
+
+            private FeatureMetadata createFeatureMetadata(final 
ApiVersionsResponse response) {
+                final Map<String, FinalizedVersionRange> finalizedFeatures = 
new HashMap<>();
+                for (final FinalizedFeatureKey key : 
response.data().finalizedFeatures().valuesSet()) {
+                    finalizedFeatures.put(key.name(), new 
FinalizedVersionRange(key.minVersionLevel(), key.maxVersionLevel()));
+                }
+
+                Optional<Long> finalizedFeaturesEpoch;
+                if (response.data().finalizedFeaturesEpoch() >= 0L) {
+                    finalizedFeaturesEpoch = 
Optional.of(response.data().finalizedFeaturesEpoch());
+                } else {
+                    finalizedFeaturesEpoch = Optional.empty();
+                }
+
+                final Map<String, SupportedVersionRange> supportedFeatures = 
new HashMap<>();
+                for (final SupportedFeatureKey key : 
response.data().supportedFeatures().valuesSet()) {
+                    supportedFeatures.put(key.name(), new 
SupportedVersionRange(key.minVersion(), key.maxVersion()));
+                }
+
+                return new FeatureMetadata(finalizedFeatures, 
finalizedFeaturesEpoch, supportedFeatures);
+            }
+
+            @Override
+            ApiVersionsRequest.Builder createRequest(int timeoutMs) {
+                return new ApiVersionsRequest.Builder();
+            }
+
+            @Override
+            void handleResponse(AbstractResponse response) {
+                final ApiVersionsResponse apiVersionsResponse = 
(ApiVersionsResponse) response;
+                if (apiVersionsResponse.data.errorCode() == 
Errors.NONE.code()) {
+                    
future.complete(createFeatureMetadata(apiVersionsResponse));
+                } else if (options.sendRequestToController() &&
+                           apiVersionsResponse.data.errorCode() == 
Errors.NOT_CONTROLLER.code()) {
+                    handleNotControllerError(Errors.NOT_CONTROLLER);
+                } else {
+                    
future.completeExceptionally(Errors.forCode(apiVersionsResponse.data.errorCode()).exception());
+                }
+            }
+
+            @Override
+            void handleFailure(Throwable throwable) {
+                completeAllExceptionally(Collections.singletonList(future), 
throwable);
+            }
+        };
+
+        runnable.call(call, now);
+        return new DescribeFeaturesResult(future);
+    }
+
+    @Override
+    public UpdateFeaturesResult updateFeatures(final Map<String, 
FeatureUpdate> featureUpdates,
+                                               final UpdateFeaturesOptions 
options) {
+        if (featureUpdates.isEmpty()) {
+            throw new IllegalArgumentException("Feature updates can not be 
null or empty.");
+        }
+
+        final Map<String, KafkaFutureImpl<Void>> updateFutures = new 
HashMap<>();
+        for (final Map.Entry<String, FeatureUpdate> entry : 
featureUpdates.entrySet()) {
+            updateFutures.put(entry.getKey(), new KafkaFutureImpl<>());
+        }
+
+        final long now = time.milliseconds();
+        final Call call = new Call("updateFeatures", calcDeadlineMs(now, 
options.timeoutMs()),
+            new ControllerNodeProvider()) {
+
+            @Override
+            UpdateFeaturesRequest.Builder createRequest(int timeoutMs) {
+                final UpdateFeaturesRequestData.FeatureUpdateKeyCollection 
featureUpdatesRequestData
+                    = new 
UpdateFeaturesRequestData.FeatureUpdateKeyCollection();
+                for (Map.Entry<String, FeatureUpdate> entry : 
featureUpdates.entrySet()) {
+                    final String feature = entry.getKey();
+                    final FeatureUpdate update = entry.getValue();
+                    if (feature.trim().isEmpty()) {

Review comment:
       Done. Addressed in #9393.

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
##########
@@ -1306,6 +1307,73 @@ default AlterUserScramCredentialsResult 
alterUserScramCredentials(List<UserScram
     AlterUserScramCredentialsResult 
alterUserScramCredentials(List<UserScramCredentialAlteration> alterations,
                                                               
AlterUserScramCredentialsOptions options);
 
+    /**
+     * Describes finalized as well as supported features. By default, the 
request is issued to any
+     * broker. It can be optionally directed only to the controller via 
DescribeFeaturesOptions
+     * parameter. This is particularly useful if the user requires strongly 
consistent reads of
+     * finalized features.
+     * <p>
+     * The following exceptions can be anticipated when calling {@code get()} 
on the future from the
+     * returned {@link DescribeFeaturesResult}:
+     * <ul>
+     *   <li>{@link org.apache.kafka.common.errors.TimeoutException}
+     *   If the request timed out before the describe operation could 
finish.</li>
+     * </ul>
+     * <p>
+     * @param options   the options to use
+     *
+     * @return          the {@link DescribeFeaturesResult} containing the 
result
+     */
+    DescribeFeaturesResult describeFeatures(DescribeFeaturesOptions options);

Review comment:
       Done. Addressed in #9393.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -4335,6 +4343,150 @@ void handleFailure(Throwable throwable) {
                 .hi(password, salt, iterations);
     }
 
+    public DescribeFeaturesResult describeFeatures(final 
DescribeFeaturesOptions options) {
+        final KafkaFutureImpl<FeatureMetadata> future = new 
KafkaFutureImpl<>();
+        final long now = time.milliseconds();
+        final NodeProvider provider =
+            options.sendRequestToController() ? new ControllerNodeProvider() : 
new LeastLoadedNodeProvider();
+
+        final Call call = new Call(
+            "describeFeatures", calcDeadlineMs(now, options.timeoutMs()), 
provider) {
+
+            private FeatureMetadata createFeatureMetadata(final 
ApiVersionsResponse response) {
+                final Map<String, FinalizedVersionRange> finalizedFeatures = 
new HashMap<>();
+                for (final FinalizedFeatureKey key : 
response.data().finalizedFeatures().valuesSet()) {
+                    finalizedFeatures.put(key.name(), new 
FinalizedVersionRange(key.minVersionLevel(), key.maxVersionLevel()));
+                }
+
+                Optional<Long> finalizedFeaturesEpoch;
+                if (response.data().finalizedFeaturesEpoch() >= 0L) {
+                    finalizedFeaturesEpoch = 
Optional.of(response.data().finalizedFeaturesEpoch());
+                } else {
+                    finalizedFeaturesEpoch = Optional.empty();
+                }
+
+                final Map<String, SupportedVersionRange> supportedFeatures = 
new HashMap<>();
+                for (final SupportedFeatureKey key : 
response.data().supportedFeatures().valuesSet()) {
+                    supportedFeatures.put(key.name(), new 
SupportedVersionRange(key.minVersion(), key.maxVersion()));
+                }
+
+                return new FeatureMetadata(finalizedFeatures, 
finalizedFeaturesEpoch, supportedFeatures);
+            }
+
+            @Override
+            ApiVersionsRequest.Builder createRequest(int timeoutMs) {
+                return new ApiVersionsRequest.Builder();
+            }
+
+            @Override
+            void handleResponse(AbstractResponse response) {
+                final ApiVersionsResponse apiVersionsResponse = 
(ApiVersionsResponse) response;
+                if (apiVersionsResponse.data.errorCode() == 
Errors.NONE.code()) {
+                    
future.complete(createFeatureMetadata(apiVersionsResponse));
+                } else if (options.sendRequestToController() &&
+                           apiVersionsResponse.data.errorCode() == 
Errors.NOT_CONTROLLER.code()) {
+                    handleNotControllerError(Errors.NOT_CONTROLLER);
+                } else {
+                    
future.completeExceptionally(Errors.forCode(apiVersionsResponse.data.errorCode()).exception());
+                }
+            }
+
+            @Override
+            void handleFailure(Throwable throwable) {
+                completeAllExceptionally(Collections.singletonList(future), 
throwable);
+            }
+        };
+
+        runnable.call(call, now);
+        return new DescribeFeaturesResult(future);
+    }
+
+    @Override
+    public UpdateFeaturesResult updateFeatures(final Map<String, 
FeatureUpdate> featureUpdates,
+                                               final UpdateFeaturesOptions 
options) {
+        if (featureUpdates.isEmpty()) {
+            throw new IllegalArgumentException("Feature updates can not be 
null or empty.");
+        }
+
+        final Map<String, KafkaFutureImpl<Void>> updateFutures = new 
HashMap<>();
+        for (final Map.Entry<String, FeatureUpdate> entry : 
featureUpdates.entrySet()) {
+            updateFutures.put(entry.getKey(), new KafkaFutureImpl<>());
+        }
+
+        final long now = time.milliseconds();
+        final Call call = new Call("updateFeatures", calcDeadlineMs(now, 
options.timeoutMs()),
+            new ControllerNodeProvider()) {
+
+            @Override
+            UpdateFeaturesRequest.Builder createRequest(int timeoutMs) {
+                final UpdateFeaturesRequestData.FeatureUpdateKeyCollection 
featureUpdatesRequestData
+                    = new 
UpdateFeaturesRequestData.FeatureUpdateKeyCollection();
+                for (Map.Entry<String, FeatureUpdate> entry : 
featureUpdates.entrySet()) {
+                    final String feature = entry.getKey();
+                    final FeatureUpdate update = entry.getValue();
+                    if (feature.trim().isEmpty()) {
+                        throw new IllegalArgumentException("Provided feature 
can not be null or empty.");
+                    }
+
+                    final UpdateFeaturesRequestData.FeatureUpdateKey 
requestItem =
+                        new UpdateFeaturesRequestData.FeatureUpdateKey();
+                    requestItem.setFeature(feature);
+                    requestItem.setMaxVersionLevel(update.maxVersionLevel());
+                    requestItem.setAllowDowngrade(update.allowDowngrade());
+                    featureUpdatesRequestData.add(requestItem);
+                }
+                return new UpdateFeaturesRequest.Builder(
+                    new UpdateFeaturesRequestData()
+                        .setTimeoutMs(timeoutMs)
+                        .setFeatureUpdates(featureUpdatesRequestData));
+            }
+
+            @Override
+            void handleResponse(AbstractResponse abstractResponse) {
+                final UpdateFeaturesResponse response =
+                    (UpdateFeaturesResponse) abstractResponse;
+
+                Errors topLevelError = 
Errors.forCode(response.data().errorCode());
+                switch (topLevelError) {
+                    case NONE:
+                        for (final UpdatableFeatureResult result : 
response.data().results()) {
+                            final KafkaFutureImpl<Void> future = 
updateFutures.get(result.feature());
+                            if (future == null) {
+                                log.warn("Server response mentioned unknown 
feature {}", result.feature());
+                            } else {
+                                final Errors error = 
Errors.forCode(result.errorCode());
+                                if (error == Errors.NONE) {
+                                    future.complete(null);
+                                } else {
+                                    
future.completeExceptionally(error.exception(result.errorMessage()));
+                                }
+                            }
+                        }
+                        // The server should send back a response for every 
feature, but we do a sanity check anyway.
+                        
completeUnrealizedFutures(updateFutures.entrySet().stream(),
+                            feature -> "The controller response did not 
contain a result for feature " + feature);
+                        break;
+                    case NOT_CONTROLLER:
+                        handleNotControllerError(topLevelError);
+                        break;
+                    default:
+                        for (final Map.Entry<String, KafkaFutureImpl<Void>> 
entry : updateFutures.entrySet()) {
+                            
entry.getValue().completeExceptionally(topLevelError.exception());

Review comment:
       Done. Addressed in #9393.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to