kowshik commented on a change in pull request #9001: URL: https://github.com/apache/kafka/pull/9001#discussion_r501575432
########## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ########## @@ -4335,6 +4343,150 @@ void handleFailure(Throwable throwable) { .hi(password, salt, iterations); } + public DescribeFeaturesResult describeFeatures(final DescribeFeaturesOptions options) { + final KafkaFutureImpl<FeatureMetadata> future = new KafkaFutureImpl<>(); + final long now = time.milliseconds(); + final NodeProvider provider = + options.sendRequestToController() ? new ControllerNodeProvider() : new LeastLoadedNodeProvider(); + + final Call call = new Call( + "describeFeatures", calcDeadlineMs(now, options.timeoutMs()), provider) { + + private FeatureMetadata createFeatureMetadata(final ApiVersionsResponse response) { + final Map<String, FinalizedVersionRange> finalizedFeatures = new HashMap<>(); + for (final FinalizedFeatureKey key : response.data().finalizedFeatures().valuesSet()) { + finalizedFeatures.put(key.name(), new FinalizedVersionRange(key.minVersionLevel(), key.maxVersionLevel())); + } + + Optional<Long> finalizedFeaturesEpoch; + if (response.data().finalizedFeaturesEpoch() >= 0L) { + finalizedFeaturesEpoch = Optional.of(response.data().finalizedFeaturesEpoch()); + } else { + finalizedFeaturesEpoch = Optional.empty(); + } + + final Map<String, SupportedVersionRange> supportedFeatures = new HashMap<>(); + for (final SupportedFeatureKey key : response.data().supportedFeatures().valuesSet()) { + supportedFeatures.put(key.name(), new SupportedVersionRange(key.minVersion(), key.maxVersion())); + } + + return new FeatureMetadata(finalizedFeatures, finalizedFeaturesEpoch, supportedFeatures); + } + + @Override + ApiVersionsRequest.Builder createRequest(int timeoutMs) { + return new ApiVersionsRequest.Builder(); + } + + @Override + void handleResponse(AbstractResponse response) { + final ApiVersionsResponse apiVersionsResponse = (ApiVersionsResponse) response; + if (apiVersionsResponse.data.errorCode() == Errors.NONE.code()) { + future.complete(createFeatureMetadata(apiVersionsResponse)); + } else if (options.sendRequestToController() && + apiVersionsResponse.data.errorCode() == Errors.NOT_CONTROLLER.code()) { + handleNotControllerError(Errors.NOT_CONTROLLER); + } else { + future.completeExceptionally(Errors.forCode(apiVersionsResponse.data.errorCode()).exception()); + } + } + + @Override + void handleFailure(Throwable throwable) { + completeAllExceptionally(Collections.singletonList(future), throwable); + } + }; + + runnable.call(call, now); + return new DescribeFeaturesResult(future); + } + + @Override + public UpdateFeaturesResult updateFeatures(final Map<String, FeatureUpdate> featureUpdates, + final UpdateFeaturesOptions options) { + if (featureUpdates.isEmpty()) { + throw new IllegalArgumentException("Feature updates can not be null or empty."); + } + + final Map<String, KafkaFutureImpl<Void>> updateFutures = new HashMap<>(); + for (final Map.Entry<String, FeatureUpdate> entry : featureUpdates.entrySet()) { + updateFutures.put(entry.getKey(), new KafkaFutureImpl<>()); + } + + final long now = time.milliseconds(); + final Call call = new Call("updateFeatures", calcDeadlineMs(now, options.timeoutMs()), + new ControllerNodeProvider()) { + + @Override + UpdateFeaturesRequest.Builder createRequest(int timeoutMs) { + final UpdateFeaturesRequestData.FeatureUpdateKeyCollection featureUpdatesRequestData + = new UpdateFeaturesRequestData.FeatureUpdateKeyCollection(); + for (Map.Entry<String, FeatureUpdate> entry : featureUpdates.entrySet()) { + final String feature = entry.getKey(); + final FeatureUpdate update = entry.getValue(); + if (feature.trim().isEmpty()) { Review comment: Done. Addressed in #9393. ########## File path: clients/src/main/java/org/apache/kafka/clients/admin/Admin.java ########## @@ -1306,6 +1307,73 @@ default AlterUserScramCredentialsResult alterUserScramCredentials(List<UserScram AlterUserScramCredentialsResult alterUserScramCredentials(List<UserScramCredentialAlteration> alterations, AlterUserScramCredentialsOptions options); + /** + * Describes finalized as well as supported features. By default, the request is issued to any + * broker. It can be optionally directed only to the controller via DescribeFeaturesOptions + * parameter. This is particularly useful if the user requires strongly consistent reads of + * finalized features. + * <p> + * The following exceptions can be anticipated when calling {@code get()} on the future from the + * returned {@link DescribeFeaturesResult}: + * <ul> + * <li>{@link org.apache.kafka.common.errors.TimeoutException} + * If the request timed out before the describe operation could finish.</li> + * </ul> + * <p> + * @param options the options to use + * + * @return the {@link DescribeFeaturesResult} containing the result + */ + DescribeFeaturesResult describeFeatures(DescribeFeaturesOptions options); Review comment: Done. Addressed in #9393. ########## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ########## @@ -4335,6 +4343,150 @@ void handleFailure(Throwable throwable) { .hi(password, salt, iterations); } + public DescribeFeaturesResult describeFeatures(final DescribeFeaturesOptions options) { + final KafkaFutureImpl<FeatureMetadata> future = new KafkaFutureImpl<>(); + final long now = time.milliseconds(); + final NodeProvider provider = + options.sendRequestToController() ? new ControllerNodeProvider() : new LeastLoadedNodeProvider(); + + final Call call = new Call( + "describeFeatures", calcDeadlineMs(now, options.timeoutMs()), provider) { + + private FeatureMetadata createFeatureMetadata(final ApiVersionsResponse response) { + final Map<String, FinalizedVersionRange> finalizedFeatures = new HashMap<>(); + for (final FinalizedFeatureKey key : response.data().finalizedFeatures().valuesSet()) { + finalizedFeatures.put(key.name(), new FinalizedVersionRange(key.minVersionLevel(), key.maxVersionLevel())); + } + + Optional<Long> finalizedFeaturesEpoch; + if (response.data().finalizedFeaturesEpoch() >= 0L) { + finalizedFeaturesEpoch = Optional.of(response.data().finalizedFeaturesEpoch()); + } else { + finalizedFeaturesEpoch = Optional.empty(); + } + + final Map<String, SupportedVersionRange> supportedFeatures = new HashMap<>(); + for (final SupportedFeatureKey key : response.data().supportedFeatures().valuesSet()) { + supportedFeatures.put(key.name(), new SupportedVersionRange(key.minVersion(), key.maxVersion())); + } + + return new FeatureMetadata(finalizedFeatures, finalizedFeaturesEpoch, supportedFeatures); + } + + @Override + ApiVersionsRequest.Builder createRequest(int timeoutMs) { + return new ApiVersionsRequest.Builder(); + } + + @Override + void handleResponse(AbstractResponse response) { + final ApiVersionsResponse apiVersionsResponse = (ApiVersionsResponse) response; + if (apiVersionsResponse.data.errorCode() == Errors.NONE.code()) { + future.complete(createFeatureMetadata(apiVersionsResponse)); + } else if (options.sendRequestToController() && + apiVersionsResponse.data.errorCode() == Errors.NOT_CONTROLLER.code()) { + handleNotControllerError(Errors.NOT_CONTROLLER); + } else { + future.completeExceptionally(Errors.forCode(apiVersionsResponse.data.errorCode()).exception()); + } + } + + @Override + void handleFailure(Throwable throwable) { + completeAllExceptionally(Collections.singletonList(future), throwable); + } + }; + + runnable.call(call, now); + return new DescribeFeaturesResult(future); + } + + @Override + public UpdateFeaturesResult updateFeatures(final Map<String, FeatureUpdate> featureUpdates, + final UpdateFeaturesOptions options) { + if (featureUpdates.isEmpty()) { + throw new IllegalArgumentException("Feature updates can not be null or empty."); + } + + final Map<String, KafkaFutureImpl<Void>> updateFutures = new HashMap<>(); + for (final Map.Entry<String, FeatureUpdate> entry : featureUpdates.entrySet()) { + updateFutures.put(entry.getKey(), new KafkaFutureImpl<>()); + } + + final long now = time.milliseconds(); + final Call call = new Call("updateFeatures", calcDeadlineMs(now, options.timeoutMs()), + new ControllerNodeProvider()) { + + @Override + UpdateFeaturesRequest.Builder createRequest(int timeoutMs) { + final UpdateFeaturesRequestData.FeatureUpdateKeyCollection featureUpdatesRequestData + = new UpdateFeaturesRequestData.FeatureUpdateKeyCollection(); + for (Map.Entry<String, FeatureUpdate> entry : featureUpdates.entrySet()) { + final String feature = entry.getKey(); + final FeatureUpdate update = entry.getValue(); + if (feature.trim().isEmpty()) { + throw new IllegalArgumentException("Provided feature can not be null or empty."); + } + + final UpdateFeaturesRequestData.FeatureUpdateKey requestItem = + new UpdateFeaturesRequestData.FeatureUpdateKey(); + requestItem.setFeature(feature); + requestItem.setMaxVersionLevel(update.maxVersionLevel()); + requestItem.setAllowDowngrade(update.allowDowngrade()); + featureUpdatesRequestData.add(requestItem); + } + return new UpdateFeaturesRequest.Builder( + new UpdateFeaturesRequestData() + .setTimeoutMs(timeoutMs) + .setFeatureUpdates(featureUpdatesRequestData)); + } + + @Override + void handleResponse(AbstractResponse abstractResponse) { + final UpdateFeaturesResponse response = + (UpdateFeaturesResponse) abstractResponse; + + Errors topLevelError = Errors.forCode(response.data().errorCode()); + switch (topLevelError) { + case NONE: + for (final UpdatableFeatureResult result : response.data().results()) { + final KafkaFutureImpl<Void> future = updateFutures.get(result.feature()); + if (future == null) { + log.warn("Server response mentioned unknown feature {}", result.feature()); + } else { + final Errors error = Errors.forCode(result.errorCode()); + if (error == Errors.NONE) { + future.complete(null); + } else { + future.completeExceptionally(error.exception(result.errorMessage())); + } + } + } + // The server should send back a response for every feature, but we do a sanity check anyway. + completeUnrealizedFutures(updateFutures.entrySet().stream(), + feature -> "The controller response did not contain a result for feature " + feature); + break; + case NOT_CONTROLLER: + handleNotControllerError(topLevelError); + break; + default: + for (final Map.Entry<String, KafkaFutureImpl<Void>> entry : updateFutures.entrySet()) { + entry.getValue().completeExceptionally(topLevelError.exception()); Review comment: Done. Addressed in #9393. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org