junrao commented on code in PR #16443:
URL: https://github.com/apache/kafka/pull/16443#discussion_r1771950807


##########
metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java:
##########
@@ -241,6 +255,16 @@ private ApiError updateFeature(
             // Perform additional checks if we're updating metadata.version
             return updateMetadataVersion(newVersion, 
upgradeType.equals(FeatureUpdate.UpgradeType.UNSAFE_DOWNGRADE), records::add);
         } else {
+            // Validate dependencies for features that are not metadata.version
+            try {
+                if (newVersion != 0) {
+                    Features.validateVersion(
+                        
Features.featureFromName(featureName).fromFeatureLevel(newVersion, true),

Review Comment:
   It would be useful to add a comment on why allowUnstableFeatureVersions is 
always true.



##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -2305,11 +2307,34 @@ public CompletableFuture<UpdateFeaturesResponseData> 
updateFeatures(
         }).thenApply(result -> {
             UpdateFeaturesResponseData responseData = new 
UpdateFeaturesResponseData();
             responseData.setResults(new 
UpdateFeaturesResponseData.UpdatableFeatureResultCollection(result.size()));
-            result.forEach((featureName, error) -> responseData.results().add(
-                new UpdateFeaturesResponseData.UpdatableFeatureResult()
-                    .setFeature(featureName)
-                    .setErrorCode(error.error().code())
-                    .setErrorMessage(error.message())));
+            Optional<Entry<String, ApiError>> errorEntry = Optional.empty();
+            if (context.requestHeader().requestApiVersion() > 1) {
+                Stream<Entry<String, ApiError>> errorEntries = 
result.entrySet().stream().filter(entry ->
+                        !entry.getValue().error().equals(Errors.NONE));
+                errorEntry = errorEntries.findFirst();
+            }
+                
+            if (errorEntry.isPresent()) {
+                String errorFeatureName = errorEntry.get().getKey();
+                ApiError topError = errorEntry.get().getValue();
+                String errorString = errorFeatureName + ":" + 
topError.error().exceptionName() + " (" + topError.message() + ")";
+                
responseData.setErrorCode(Errors.INVALID_UPDATE_VERSION.code());

Review Comment:
   Should we use the error code from `topError`?



##########
metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java:
##########
@@ -174,15 +175,27 @@ ControllerResult<Map<String, ApiError>> updateFeatures(
         Map<String, FeatureUpdate.UpgradeType> upgradeTypes,
         boolean validateOnly
     ) {
+        boolean updateFailed = false;
         TreeMap<String, ApiError> results = new TreeMap<>();
         List<ApiMessageAndVersion> records =
                 BoundedList.newArrayBacked(MAX_RECORDS_PER_USER_OP);
+
+        Map<String, Short> proposedUpdatedVersions = new HashMap<>();
+        finalizedVersions.forEach(proposedUpdatedVersions::put);
+        proposedUpdatedVersions.put(MetadataVersion.FEATURE_NAME, 
metadataVersion.get().featureLevel());
+        updates.forEach(proposedUpdatedVersions::put);
+
         for (Entry<String, Short> entry : updates.entrySet()) {
-            results.put(entry.getKey(), updateFeature(entry.getKey(), 
entry.getValue(),
-                upgradeTypes.getOrDefault(entry.getKey(), 
FeatureUpdate.UpgradeType.UPGRADE), records));
+            ApiError error = updateFeature(entry.getKey(), entry.getValue(),
+                upgradeTypes.getOrDefault(entry.getKey(), 
FeatureUpdate.UpgradeType.UPGRADE), records, proposedUpdatedVersions);
+            results.put(entry.getKey(), error);
+            if (!error.error().equals(Errors.NONE)) {
+                updateFailed = true;
+                break;
+            }
         }
 
-        if (validateOnly) {
+        if (validateOnly || updateFailed) {

Review Comment:
   We changed the implementation such that if one feature has an error, none of 
the features will be processed. It seems that we only need to return a top 
level error in UpdateFeaturesResponse. There is no need to have the per feature 
error code.



##########
clients/src/main/java/org/apache/kafka/common/requests/UpdateFeaturesResponse.java:
##########
@@ -82,17 +84,37 @@ public static UpdateFeaturesResponse parse(ByteBuffer 
buffer, short version) {
         return new UpdateFeaturesResponse(new UpdateFeaturesResponseData(new 
ByteBufferAccessor(buffer), version));
     }
 
-    public static UpdateFeaturesResponse createWithErrors(ApiError 
topLevelError, Map<String, ApiError> updateErrors, int throttleTimeMs) {
+    public static UpdateFeaturesResponse createWithErrors(short version, 
ApiError topLevelError, Map<String, ApiError> updateErrors, int throttleTimeMs) 
{
         final UpdatableFeatureResultCollection results = new 
UpdatableFeatureResultCollection();
-        for (final Map.Entry<String, ApiError> updateError : 
updateErrors.entrySet()) {
-            final String feature = updateError.getKey();
-            final ApiError error = updateError.getValue();
-            final UpdatableFeatureResult result = new UpdatableFeatureResult();
-            result.setFeature(feature)
-                .setErrorCode(error.error().code())
-                .setErrorMessage(error.message());
-            results.add(result);
+        Optional<Map.Entry<String, ApiError>> errorEntry = Optional.empty();
+        if (version > 1) {
+            Stream<Map.Entry<String, ApiError>> errorEntries = 
updateErrors.entrySet().stream().filter(entry ->
+                    !entry.getValue().error().equals(Errors.NONE));
+            errorEntry = errorEntries.findFirst();
         }
+
+        if (errorEntry.isPresent()) {
+            String errorFeatureName = errorEntry.get().getKey();
+            ApiError topError = errorEntry.get().getValue();
+            String errorString = errorFeatureName + ":" + 
topError.error().exceptionName() + " (" + topError.message() + ")";
+            topLevelError = new ApiError(Errors.INVALID_UPDATE_VERSION.code(),

Review Comment:
   Should we use the error code from topError?



##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -3603,13 +3603,16 @@ class KafkaApis(val requestChannel: RequestChannel,
     def sendResponseCallback(errors: Either[ApiError, Map[String, ApiError]]): 
Unit = {
       def createResponse(throttleTimeMs: Int): UpdateFeaturesResponse = {
         errors match {
+          // Hard-code version to 1 since version 2 will not be implemented 
for 4.0

Review Comment:
   How do we prevent the client from issuing V2 updateFeature request in ZK 
mode?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to