kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r496531037



##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -1656,6 +1893,203 @@ class KafkaController(val config: KafkaConfig,
     }
   }
 
+  /**
+   * Returns the new FinalizedVersionRange for the feature, if there are no 
feature
+   * incompatibilities seen with all known brokers for the provided feature 
update.
+   * Otherwise returns an ApiError object containing Errors.INVALID_REQUEST.
+   *
+   * @param update   the feature update to be processed (this can not be meant 
to delete the feature)
+   *
+   * @return         the new FinalizedVersionRange or error, as described 
above.
+   */
+  private def newFinalizedVersionRangeOrIncompatibilityError(update: 
UpdateFeaturesRequestData.FeatureUpdateKey): Either[FinalizedVersionRange, 
ApiError] = {
+    if (UpdateFeaturesRequest.isDeleteRequest(update)) {
+      throw new IllegalArgumentException(s"Provided feature update can not be 
meant to delete the feature: $update")
+    }
+
+    val supportedVersionRange = 
brokerFeatures.supportedFeatures.get(update.feature)
+    if (supportedVersionRange == null) {
+      Right(new ApiError(Errors.INVALID_REQUEST,
+                         "Could not apply finalized feature update because the 
provided feature" +
+                         " is not supported."))
+    } else {
+      var newVersionRange: FinalizedVersionRange = null
+      try {
+        newVersionRange = new 
FinalizedVersionRange(supportedVersionRange.firstActiveVersion, 
update.maxVersionLevel)
+      } catch {
+        case _: IllegalArgumentException => {
+          // This exception means the provided maxVersionLevel is invalid. It 
is handled below
+          // outside of this catch clause.
+        }
+      }
+      if (newVersionRange == null) {
+        Right(new ApiError(Errors.INVALID_REQUEST,
+          "Could not apply finalized feature update because the provided" +
+          s" maxVersionLevel:${update.maxVersionLevel} is lower than the" +
+          s" first active 
version:${supportedVersionRange.firstActiveVersion}."))
+      } else {
+        val newFinalizedFeature =
+          Features.finalizedFeatures(Utils.mkMap(Utils.mkEntry(update.feature, 
newVersionRange)))
+        val numIncompatibleBrokers = 
controllerContext.liveOrShuttingDownBrokers.count(broker => {
+          BrokerFeatures.hasIncompatibleFeatures(broker.features, 
newFinalizedFeature)
+        })
+        if (numIncompatibleBrokers == 0) {
+          Left(newVersionRange)
+        } else {
+          Right(new ApiError(Errors.INVALID_REQUEST,
+                             "Could not apply finalized feature update 
because" +
+                             " brokers were found to have incompatible 
versions for the feature."))
+        }
+      }
+    }
+  }
+
+  /**
+   * Validates a feature update on an existing FinalizedVersionRange.
+   * If the validation succeeds, then, the return value contains:
+   * 1. the new FinalizedVersionRange for the feature, if the feature update 
was not meant to delete the feature.
+   * 2. Option.empty, if the feature update was meant to delete the feature.
+   *
+   * If the validation fails, then returned value contains a suitable ApiError.
+   *
+   * @param update                 the feature update to be processed.
+   * @param existingVersionRange   the existing FinalizedVersionRange which 
can be empty when no
+   *                               FinalizedVersionRange exists for the 
associated feature
+   *
+   * @return                       the new FinalizedVersionRange to be updated 
into ZK or error
+   *                               as described above.
+   */
+  private def validateFeatureUpdate(update: 
UpdateFeaturesRequestData.FeatureUpdateKey,
+                                   existingVersionRange: 
Option[FinalizedVersionRange]): Either[Option[FinalizedVersionRange], ApiError] 
= {

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to