junrao commented on code in PR #16443:
URL: https://github.com/apache/kafka/pull/16443#discussion_r1779270778


##########
clients/src/main/resources/common/message/UpdateFeaturesResponse.json:
##########
@@ -26,13 +26,13 @@
       "about": "The top-level error code, or `0` if there was no top-level 
error." },
     { "name": "ErrorMessage", "type": "string", "versions": "0+", 
"nullableVersions": "0+",
       "about": "The top-level error message, or `null` if there was no 
top-level error." },
-    { "name": "Results", "type": "[]UpdatableFeatureResult", "versions": "0+",
+    { "name": "Results", "type": "[]UpdatableFeatureResult", "versions": 
"0-1", "ignorable": true,
       "about": "Results for each feature update.", "fields": [
-      { "name": "Feature", "type": "string", "versions": "0+", "mapKey": true,
+      { "name": "Feature", "type": "string", "versions": "0-1", "mapKey": true,

Review Comment:
   Do we need to cap the version in the sub fields? For example, 
DeleteTopicsRequest doesn't do that in v6.



##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -2314,11 +2315,24 @@ public CompletableFuture<UpdateFeaturesResponseData> 
updateFeatures(
         }).thenApply(result -> {
             UpdateFeaturesResponseData responseData = new 
UpdateFeaturesResponseData();
             responseData.setResults(new 
UpdateFeaturesResponseData.UpdatableFeatureResultCollection(result.size()));
-            result.forEach((featureName, error) -> responseData.results().add(
-                new UpdateFeaturesResponseData.UpdatableFeatureResult()
-                    .setFeature(featureName)
-                    .setErrorCode(error.error().code())
-                    .setErrorMessage(error.message())));
+            Optional<Entry<String, ApiError>> errorEntry = 
result.entrySet().stream().filter(entry ->

Review Comment:
   In the previous line, we don't want to call `responseData.setResults` if 
there is an error, right?



##########
metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java:
##########
@@ -241,6 +253,17 @@ private ApiError updateFeature(
             // Perform additional checks if we're updating metadata.version
             return updateMetadataVersion(newVersion, 
upgradeType.equals(FeatureUpdate.UpgradeType.UNSAFE_DOWNGRADE), records::add);
         } else {
+            // Validate dependencies for features that are not metadata.version
+            try {
+                if (newVersion != 0) {

Review Comment:
   Should we validate for v0 too?



##########
metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java:
##########
@@ -177,9 +178,19 @@ ControllerResult<Map<String, ApiError>> updateFeatures(
         TreeMap<String, ApiError> results = new TreeMap<>();
         List<ApiMessageAndVersion> records =
                 BoundedList.newArrayBacked(MAX_RECORDS_PER_USER_OP);
+
+        Map<String, Short> proposedUpdatedVersions = new HashMap<>();
+        finalizedVersions.forEach(proposedUpdatedVersions::put);
+        proposedUpdatedVersions.put(MetadataVersion.FEATURE_NAME, 
metadataVersion.get().featureLevel());
+        updates.forEach(proposedUpdatedVersions::put);
+
         for (Entry<String, Short> entry : updates.entrySet()) {
-            results.put(entry.getKey(), updateFeature(entry.getKey(), 
entry.getValue(),
-                upgradeTypes.getOrDefault(entry.getKey(), 
FeatureUpdate.UpgradeType.UPGRADE), records));
+            ApiError error = updateFeature(entry.getKey(), entry.getValue(),
+                upgradeTypes.getOrDefault(entry.getKey(), 
FeatureUpdate.UpgradeType.UPGRADE), records, proposedUpdatedVersions);
+            if (!error.error().equals(Errors.NONE)) {
+                return ControllerResult.of(Collections.emptyList(), 
Collections.singletonMap(entry.getKey(), error));
+            }
+            results.put(entry.getKey(), error);

Review Comment:
   Could we just return `ControllerResult<ApiError>` for `updateFeatures`?



##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -2314,11 +2315,24 @@ public CompletableFuture<UpdateFeaturesResponseData> 
updateFeatures(
         }).thenApply(result -> {
             UpdateFeaturesResponseData responseData = new 
UpdateFeaturesResponseData();
             responseData.setResults(new 
UpdateFeaturesResponseData.UpdatableFeatureResultCollection(result.size()));
-            result.forEach((featureName, error) -> responseData.results().add(
-                new UpdateFeaturesResponseData.UpdatableFeatureResult()
-                    .setFeature(featureName)
-                    .setErrorCode(error.error().code())
-                    .setErrorMessage(error.message())));
+            Optional<Entry<String, ApiError>> errorEntry = 
result.entrySet().stream().filter(entry ->
+                !entry.getValue().error().equals(Errors.NONE)).findFirst();
+                
+            if (errorEntry.isPresent()) {
+                String errorFeatureName = errorEntry.get().getKey();
+                ApiError topError = errorEntry.get().getValue();
+                String errorString = errorFeatureName + ":" + 
topError.error().exceptionName() + " (" + topError.message() + ")";

Review Comment:
   Do we need to construct the new error message? 
`FeatureControlManager.invalidUpdateVersion` already includes the feature name 
in the error message.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to