chia7712 commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r501489060
##########
File path:
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -4335,6 +4343,150 @@ void handleFailure(Throwable throwable) {
.hi(password, salt, iterations);
}
+ public DescribeFeaturesResult describeFeatures(final
DescribeFeaturesOptions options) {
+ final KafkaFutureImpl<FeatureMetadata> future = new
KafkaFutureImpl<>();
+ final long now = time.milliseconds();
+ final NodeProvider provider =
+ options.sendRequestToController() ? new ControllerNodeProvider() :
new LeastLoadedNodeProvider();
+
+ final Call call = new Call(
+ "describeFeatures", calcDeadlineMs(now, options.timeoutMs()),
provider) {
+
+ private FeatureMetadata createFeatureMetadata(final
ApiVersionsResponse response) {
+ final Map<String, FinalizedVersionRange> finalizedFeatures =
new HashMap<>();
+ for (final FinalizedFeatureKey key :
response.data().finalizedFeatures().valuesSet()) {
+ finalizedFeatures.put(key.name(), new
FinalizedVersionRange(key.minVersionLevel(), key.maxVersionLevel()));
+ }
+
+ Optional<Long> finalizedFeaturesEpoch;
+ if (response.data().finalizedFeaturesEpoch() >= 0L) {
+ finalizedFeaturesEpoch =
Optional.of(response.data().finalizedFeaturesEpoch());
+ } else {
+ finalizedFeaturesEpoch = Optional.empty();
+ }
+
+ final Map<String, SupportedVersionRange> supportedFeatures =
new HashMap<>();
+ for (final SupportedFeatureKey key :
response.data().supportedFeatures().valuesSet()) {
+ supportedFeatures.put(key.name(), new
SupportedVersionRange(key.minVersion(), key.maxVersion()));
+ }
+
+ return new FeatureMetadata(finalizedFeatures,
finalizedFeaturesEpoch, supportedFeatures);
+ }
+
+ @Override
+ ApiVersionsRequest.Builder createRequest(int timeoutMs) {
+ return new ApiVersionsRequest.Builder();
+ }
+
+ @Override
+ void handleResponse(AbstractResponse response) {
+ final ApiVersionsResponse apiVersionsResponse =
(ApiVersionsResponse) response;
+ if (apiVersionsResponse.data.errorCode() ==
Errors.NONE.code()) {
+
future.complete(createFeatureMetadata(apiVersionsResponse));
+ } else if (options.sendRequestToController() &&
+ apiVersionsResponse.data.errorCode() ==
Errors.NOT_CONTROLLER.code()) {
+ handleNotControllerError(Errors.NOT_CONTROLLER);
+ } else {
+
future.completeExceptionally(Errors.forCode(apiVersionsResponse.data.errorCode()).exception());
+ }
+ }
+
+ @Override
+ void handleFailure(Throwable throwable) {
+ completeAllExceptionally(Collections.singletonList(future),
throwable);
+ }
+ };
+
+ runnable.call(call, now);
+ return new DescribeFeaturesResult(future);
+ }
+
+ @Override
+ public UpdateFeaturesResult updateFeatures(final Map<String,
FeatureUpdate> featureUpdates,
+ final UpdateFeaturesOptions
options) {
+ if (featureUpdates.isEmpty()) {
+ throw new IllegalArgumentException("Feature updates can not be
null or empty.");
+ }
+
+ final Map<String, KafkaFutureImpl<Void>> updateFutures = new
HashMap<>();
+ for (final Map.Entry<String, FeatureUpdate> entry :
featureUpdates.entrySet()) {
+ updateFutures.put(entry.getKey(), new KafkaFutureImpl<>());
+ }
+
+ final long now = time.milliseconds();
+ final Call call = new Call("updateFeatures", calcDeadlineMs(now,
options.timeoutMs()),
+ new ControllerNodeProvider()) {
+
+ @Override
+ UpdateFeaturesRequest.Builder createRequest(int timeoutMs) {
+ final UpdateFeaturesRequestData.FeatureUpdateKeyCollection
featureUpdatesRequestData
+ = new
UpdateFeaturesRequestData.FeatureUpdateKeyCollection();
+ for (Map.Entry<String, FeatureUpdate> entry :
featureUpdates.entrySet()) {
+ final String feature = entry.getKey();
+ final FeatureUpdate update = entry.getValue();
+ if (feature.trim().isEmpty()) {
+ throw new IllegalArgumentException("Provided feature
can not be null or empty.");
+ }
+
+ final UpdateFeaturesRequestData.FeatureUpdateKey
requestItem =
+ new UpdateFeaturesRequestData.FeatureUpdateKey();
+ requestItem.setFeature(feature);
+ requestItem.setMaxVersionLevel(update.maxVersionLevel());
+ requestItem.setAllowDowngrade(update.allowDowngrade());
+ featureUpdatesRequestData.add(requestItem);
+ }
+ return new UpdateFeaturesRequest.Builder(
+ new UpdateFeaturesRequestData()
+ .setTimeoutMs(timeoutMs)
+ .setFeatureUpdates(featureUpdatesRequestData));
+ }
+
+ @Override
+ void handleResponse(AbstractResponse abstractResponse) {
+ final UpdateFeaturesResponse response =
+ (UpdateFeaturesResponse) abstractResponse;
+
+ Errors topLevelError =
Errors.forCode(response.data().errorCode());
+ switch (topLevelError) {
+ case NONE:
+ for (final UpdatableFeatureResult result :
response.data().results()) {
+ final KafkaFutureImpl<Void> future =
updateFutures.get(result.feature());
+ if (future == null) {
+ log.warn("Server response mentioned unknown
feature {}", result.feature());
+ } else {
+ final Errors error =
Errors.forCode(result.errorCode());
+ if (error == Errors.NONE) {
+ future.complete(null);
+ } else {
+
future.completeExceptionally(error.exception(result.errorMessage()));
+ }
+ }
+ }
+ // The server should send back a response for every
feature, but we do a sanity check anyway.
+
completeUnrealizedFutures(updateFutures.entrySet().stream(),
+ feature -> "The controller response did not
contain a result for feature " + feature);
+ break;
+ case NOT_CONTROLLER:
+ handleNotControllerError(topLevelError);
+ break;
+ default:
+ for (final Map.Entry<String, KafkaFutureImpl<Void>>
entry : updateFutures.entrySet()) {
+
entry.getValue().completeExceptionally(topLevelError.exception());
Review comment:
the top-level error message is not propagated.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]