kowshik commented on a change in pull request #9001: URL: https://github.com/apache/kafka/pull/9001#discussion_r494183456
########## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ########## @@ -4071,6 +4078,113 @@ void handleFailure(Throwable throwable) { return new AlterClientQuotasResult(Collections.unmodifiableMap(futures)); } + @Override + public DescribeFeaturesResult describeFeatures(final DescribeFeaturesOptions options) { + final KafkaFutureImpl<FeatureMetadata> future = new KafkaFutureImpl<>(); + final long now = time.milliseconds(); + final NodeProvider provider = + options.sendRequestToController() ? new ControllerNodeProvider() : new LeastLoadedNodeProvider(); + + Call call = new Call( + "describeFeatures", calcDeadlineMs(now, options.timeoutMs()), provider) { + + @Override + ApiVersionsRequest.Builder createRequest(int timeoutMs) { + return new ApiVersionsRequest.Builder(); + } + + @Override + void handleResponse(AbstractResponse response) { + final ApiVersionsResponse apiVersionsResponse = (ApiVersionsResponse) response; + if (apiVersionsResponse.data.errorCode() == Errors.NONE.code()) { + future.complete( + new FeatureMetadata( + apiVersionsResponse.finalizedFeatures(), + apiVersionsResponse.finalizedFeaturesEpoch(), + apiVersionsResponse.supportedFeatures())); + } else if (options.sendRequestToController() && apiVersionsResponse.data.errorCode() == Errors.NOT_CONTROLLER.code()) { + handleNotControllerError(Errors.NOT_CONTROLLER); + } else { + future.completeExceptionally( + Errors.forCode(apiVersionsResponse.data.errorCode()).exception()); + } + } + + @Override + void handleFailure(Throwable throwable) { + completeAllExceptionally(Collections.singletonList(future), throwable); + } + }; + + runnable.call(call, now); + return new DescribeFeaturesResult(future); + } + + @Override + public UpdateFeaturesResult updateFeatures( + final Map<String, FeatureUpdate> featureUpdates, final UpdateFeaturesOptions options) { + if (featureUpdates == null || featureUpdates.isEmpty()) { + throw new IllegalArgumentException("Feature updates can not be null or empty."); + } + Objects.requireNonNull(options, "UpdateFeaturesOptions can not be null"); + + final UpdateFeaturesRequestData request = UpdateFeaturesRequest.create(featureUpdates); + final Map<String, KafkaFutureImpl<Void>> updateFutures = new HashMap<>(); + for (Map.Entry<String, FeatureUpdate> entry : featureUpdates.entrySet()) { + updateFutures.put(entry.getKey(), new KafkaFutureImpl<>()); + } + final long now = time.milliseconds(); + final Call call = new Call("updateFeatures", calcDeadlineMs(now, options.timeoutMs()), + new ControllerNodeProvider()) { + + @Override + UpdateFeaturesRequest.Builder createRequest(int timeoutMs) { + return new UpdateFeaturesRequest.Builder(request); + } + + @Override + void handleResponse(AbstractResponse abstractResponse) { + final UpdateFeaturesResponse response = + (UpdateFeaturesResponse) abstractResponse; + + // Check for controller change. + for (UpdatableFeatureResult result : response.data().results()) { + final Errors error = Errors.forCode(result.errorCode()); + if (error == Errors.NOT_CONTROLLER) { + handleNotControllerError(error); + throw error.exception(); Review comment: > handleNotControllerError() already throws an exception. Done. Fixed the code to not throw exception again when handling NOT_CONTROLLER error. > Should other errors like CLUSTER_AUTHORIZATION_FAILED be treated in the same way? I'm not sure how could we treat it the same way. In the case of the NOT_CONTROLLER error, the admin client code would retry the request once again when the exception is raised. But when cluster authorization fails, would a retry help? ########## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ########## @@ -4071,6 +4078,113 @@ void handleFailure(Throwable throwable) { return new AlterClientQuotasResult(Collections.unmodifiableMap(futures)); } + @Override + public DescribeFeaturesResult describeFeatures(final DescribeFeaturesOptions options) { + final KafkaFutureImpl<FeatureMetadata> future = new KafkaFutureImpl<>(); + final long now = time.milliseconds(); + final NodeProvider provider = + options.sendRequestToController() ? new ControllerNodeProvider() : new LeastLoadedNodeProvider(); + + Call call = new Call( + "describeFeatures", calcDeadlineMs(now, options.timeoutMs()), provider) { + + @Override + ApiVersionsRequest.Builder createRequest(int timeoutMs) { + return new ApiVersionsRequest.Builder(); + } + + @Override + void handleResponse(AbstractResponse response) { + final ApiVersionsResponse apiVersionsResponse = (ApiVersionsResponse) response; + if (apiVersionsResponse.data.errorCode() == Errors.NONE.code()) { + future.complete( + new FeatureMetadata( + apiVersionsResponse.finalizedFeatures(), + apiVersionsResponse.finalizedFeaturesEpoch(), + apiVersionsResponse.supportedFeatures())); + } else if (options.sendRequestToController() && apiVersionsResponse.data.errorCode() == Errors.NOT_CONTROLLER.code()) { + handleNotControllerError(Errors.NOT_CONTROLLER); + } else { + future.completeExceptionally( + Errors.forCode(apiVersionsResponse.data.errorCode()).exception()); + } + } + + @Override + void handleFailure(Throwable throwable) { + completeAllExceptionally(Collections.singletonList(future), throwable); + } + }; + + runnable.call(call, now); + return new DescribeFeaturesResult(future); + } + + @Override + public UpdateFeaturesResult updateFeatures( + final Map<String, FeatureUpdate> featureUpdates, final UpdateFeaturesOptions options) { + if (featureUpdates == null || featureUpdates.isEmpty()) { + throw new IllegalArgumentException("Feature updates can not be null or empty."); + } + Objects.requireNonNull(options, "UpdateFeaturesOptions can not be null"); + + final UpdateFeaturesRequestData request = UpdateFeaturesRequest.create(featureUpdates); + final Map<String, KafkaFutureImpl<Void>> updateFutures = new HashMap<>(); + for (Map.Entry<String, FeatureUpdate> entry : featureUpdates.entrySet()) { + updateFutures.put(entry.getKey(), new KafkaFutureImpl<>()); + } + final long now = time.milliseconds(); + final Call call = new Call("updateFeatures", calcDeadlineMs(now, options.timeoutMs()), + new ControllerNodeProvider()) { + + @Override + UpdateFeaturesRequest.Builder createRequest(int timeoutMs) { + return new UpdateFeaturesRequest.Builder(request); + } + + @Override + void handleResponse(AbstractResponse abstractResponse) { + final UpdateFeaturesResponse response = + (UpdateFeaturesResponse) abstractResponse; + + // Check for controller change. + for (UpdatableFeatureResult result : response.data().results()) { + final Errors error = Errors.forCode(result.errorCode()); + if (error == Errors.NOT_CONTROLLER) { + handleNotControllerError(error); + throw error.exception(); Review comment: > handleNotControllerError() already throws an exception. Done. Fixed the code to not throw exception again when handling NOT_CONTROLLER error. > Should other errors like CLUSTER_AUTHORIZATION_FAILED be treated in the same way? I'm not sure how could we treat it the same way. In the case of the NOT_CONTROLLER error, the admin client code would retry the request once again when the exception is raised. But when cluster authorization fails, would a retry help? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org