junrao commented on code in PR #16443: URL: https://github.com/apache/kafka/pull/16443#discussion_r1771950807
########## metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java: ########## @@ -241,6 +255,16 @@ private ApiError updateFeature( // Perform additional checks if we're updating metadata.version return updateMetadataVersion(newVersion, upgradeType.equals(FeatureUpdate.UpgradeType.UNSAFE_DOWNGRADE), records::add); } else { + // Validate dependencies for features that are not metadata.version + try { + if (newVersion != 0) { + Features.validateVersion( + Features.featureFromName(featureName).fromFeatureLevel(newVersion, true), Review Comment: It would be useful to add a comment on why allowUnstableFeatureVersions is always true. ########## metadata/src/main/java/org/apache/kafka/controller/QuorumController.java: ########## @@ -2305,11 +2307,34 @@ public CompletableFuture<UpdateFeaturesResponseData> updateFeatures( }).thenApply(result -> { UpdateFeaturesResponseData responseData = new UpdateFeaturesResponseData(); responseData.setResults(new UpdateFeaturesResponseData.UpdatableFeatureResultCollection(result.size())); - result.forEach((featureName, error) -> responseData.results().add( - new UpdateFeaturesResponseData.UpdatableFeatureResult() - .setFeature(featureName) - .setErrorCode(error.error().code()) - .setErrorMessage(error.message()))); + Optional<Entry<String, ApiError>> errorEntry = Optional.empty(); + if (context.requestHeader().requestApiVersion() > 1) { + Stream<Entry<String, ApiError>> errorEntries = result.entrySet().stream().filter(entry -> + !entry.getValue().error().equals(Errors.NONE)); + errorEntry = errorEntries.findFirst(); + } + + if (errorEntry.isPresent()) { + String errorFeatureName = errorEntry.get().getKey(); + ApiError topError = errorEntry.get().getValue(); + String errorString = errorFeatureName + ":" + topError.error().exceptionName() + " (" + topError.message() + ")"; + responseData.setErrorCode(Errors.INVALID_UPDATE_VERSION.code()); Review Comment: Should we use the error code from `topError`? ########## metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java: ########## @@ -174,15 +175,27 @@ ControllerResult<Map<String, ApiError>> updateFeatures( Map<String, FeatureUpdate.UpgradeType> upgradeTypes, boolean validateOnly ) { + boolean updateFailed = false; TreeMap<String, ApiError> results = new TreeMap<>(); List<ApiMessageAndVersion> records = BoundedList.newArrayBacked(MAX_RECORDS_PER_USER_OP); + + Map<String, Short> proposedUpdatedVersions = new HashMap<>(); + finalizedVersions.forEach(proposedUpdatedVersions::put); + proposedUpdatedVersions.put(MetadataVersion.FEATURE_NAME, metadataVersion.get().featureLevel()); + updates.forEach(proposedUpdatedVersions::put); + for (Entry<String, Short> entry : updates.entrySet()) { - results.put(entry.getKey(), updateFeature(entry.getKey(), entry.getValue(), - upgradeTypes.getOrDefault(entry.getKey(), FeatureUpdate.UpgradeType.UPGRADE), records)); + ApiError error = updateFeature(entry.getKey(), entry.getValue(), + upgradeTypes.getOrDefault(entry.getKey(), FeatureUpdate.UpgradeType.UPGRADE), records, proposedUpdatedVersions); + results.put(entry.getKey(), error); + if (!error.error().equals(Errors.NONE)) { + updateFailed = true; + break; + } } - if (validateOnly) { + if (validateOnly || updateFailed) { Review Comment: We changed the implementation such that if one feature has an error, none of the features will be processed. It seems that we only need to return a top level error in UpdateFeaturesResponse. There is no need to have the per feature error code. ########## clients/src/main/java/org/apache/kafka/common/requests/UpdateFeaturesResponse.java: ########## @@ -82,17 +84,37 @@ public static UpdateFeaturesResponse parse(ByteBuffer buffer, short version) { return new UpdateFeaturesResponse(new UpdateFeaturesResponseData(new ByteBufferAccessor(buffer), version)); } - public static UpdateFeaturesResponse createWithErrors(ApiError topLevelError, Map<String, ApiError> updateErrors, int throttleTimeMs) { + public static UpdateFeaturesResponse createWithErrors(short version, ApiError topLevelError, Map<String, ApiError> updateErrors, int throttleTimeMs) { final UpdatableFeatureResultCollection results = new UpdatableFeatureResultCollection(); - for (final Map.Entry<String, ApiError> updateError : updateErrors.entrySet()) { - final String feature = updateError.getKey(); - final ApiError error = updateError.getValue(); - final UpdatableFeatureResult result = new UpdatableFeatureResult(); - result.setFeature(feature) - .setErrorCode(error.error().code()) - .setErrorMessage(error.message()); - results.add(result); + Optional<Map.Entry<String, ApiError>> errorEntry = Optional.empty(); + if (version > 1) { + Stream<Map.Entry<String, ApiError>> errorEntries = updateErrors.entrySet().stream().filter(entry -> + !entry.getValue().error().equals(Errors.NONE)); + errorEntry = errorEntries.findFirst(); } + + if (errorEntry.isPresent()) { + String errorFeatureName = errorEntry.get().getKey(); + ApiError topError = errorEntry.get().getValue(); + String errorString = errorFeatureName + ":" + topError.error().exceptionName() + " (" + topError.message() + ")"; + topLevelError = new ApiError(Errors.INVALID_UPDATE_VERSION.code(), Review Comment: Should we use the error code from topError? ########## core/src/main/scala/kafka/server/KafkaApis.scala: ########## @@ -3603,13 +3603,16 @@ class KafkaApis(val requestChannel: RequestChannel, def sendResponseCallback(errors: Either[ApiError, Map[String, ApiError]]): Unit = { def createResponse(throttleTimeMs: Int): UpdateFeaturesResponse = { errors match { + // Hard-code version to 1 since version 2 will not be implemented for 4.0 Review Comment: How do we prevent the client from issuing V2 updateFeature request in ZK mode? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org