kowshik commented on a change in pull request #9001: URL: https://github.com/apache/kafka/pull/9001#discussion_r496531037
########## File path: core/src/main/scala/kafka/controller/KafkaController.scala ########## @@ -1656,6 +1893,203 @@ class KafkaController(val config: KafkaConfig, } } + /** + * Returns the new FinalizedVersionRange for the feature, if there are no feature + * incompatibilities seen with all known brokers for the provided feature update. + * Otherwise returns an ApiError object containing Errors.INVALID_REQUEST. + * + * @param update the feature update to be processed (this can not be meant to delete the feature) + * + * @return the new FinalizedVersionRange or error, as described above. + */ + private def newFinalizedVersionRangeOrIncompatibilityError(update: UpdateFeaturesRequestData.FeatureUpdateKey): Either[FinalizedVersionRange, ApiError] = { + if (UpdateFeaturesRequest.isDeleteRequest(update)) { + throw new IllegalArgumentException(s"Provided feature update can not be meant to delete the feature: $update") + } + + val supportedVersionRange = brokerFeatures.supportedFeatures.get(update.feature) + if (supportedVersionRange == null) { + Right(new ApiError(Errors.INVALID_REQUEST, + "Could not apply finalized feature update because the provided feature" + + " is not supported.")) + } else { + var newVersionRange: FinalizedVersionRange = null + try { + newVersionRange = new FinalizedVersionRange(supportedVersionRange.firstActiveVersion, update.maxVersionLevel) + } catch { + case _: IllegalArgumentException => { + // This exception means the provided maxVersionLevel is invalid. It is handled below + // outside of this catch clause. + } + } + if (newVersionRange == null) { + Right(new ApiError(Errors.INVALID_REQUEST, + "Could not apply finalized feature update because the provided" + + s" maxVersionLevel:${update.maxVersionLevel} is lower than the" + + s" first active version:${supportedVersionRange.firstActiveVersion}.")) + } else { + val newFinalizedFeature = + Features.finalizedFeatures(Utils.mkMap(Utils.mkEntry(update.feature, newVersionRange))) + val numIncompatibleBrokers = controllerContext.liveOrShuttingDownBrokers.count(broker => { + BrokerFeatures.hasIncompatibleFeatures(broker.features, newFinalizedFeature) + }) + if (numIncompatibleBrokers == 0) { + Left(newVersionRange) + } else { + Right(new ApiError(Errors.INVALID_REQUEST, + "Could not apply finalized feature update because" + + " brokers were found to have incompatible versions for the feature.")) + } + } + } + } + + /** + * Validates a feature update on an existing FinalizedVersionRange. + * If the validation succeeds, then, the return value contains: + * 1. the new FinalizedVersionRange for the feature, if the feature update was not meant to delete the feature. + * 2. Option.empty, if the feature update was meant to delete the feature. + * + * If the validation fails, then returned value contains a suitable ApiError. + * + * @param update the feature update to be processed. + * @param existingVersionRange the existing FinalizedVersionRange which can be empty when no + * FinalizedVersionRange exists for the associated feature + * + * @return the new FinalizedVersionRange to be updated into ZK or error + * as described above. + */ + private def validateFeatureUpdate(update: UpdateFeaturesRequestData.FeatureUpdateKey, + existingVersionRange: Option[FinalizedVersionRange]): Either[Option[FinalizedVersionRange], ApiError] = { Review comment: Done. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org