kowshik commented on a change in pull request #9001: URL: https://github.com/apache/kafka/pull/9001#discussion_r496528169
########## File path: core/src/main/scala/kafka/controller/KafkaController.scala ########## @@ -272,6 +281,199 @@ class KafkaController(val config: KafkaConfig, } } + private def createFeatureZNode(newNode: FeatureZNode): Int = { + info(s"Creating FeatureZNode at path: ${FeatureZNode.path} with contents: $newNode") + zkClient.createFeatureZNode(newNode) + val (_, newVersion) = zkClient.getDataAndVersion(FeatureZNode.path) + newVersion + } + + private def updateFeatureZNode(updatedNode: FeatureZNode): Int = { + info(s"Updating FeatureZNode at path: ${FeatureZNode.path} with contents: $updatedNode") + zkClient.updateFeatureZNode(updatedNode) + } + + /** + * This method enables the feature versioning system (KIP-584). + * + * Development in Kafka (from a high level) is organized into features. Each feature is tracked by + * a name and a range of version numbers. A feature can be of two types: + * + * 1. Supported feature: + * A supported feature is represented by a name (string) and a range of versions (defined by a + * SupportedVersionRange). It refers to a feature that a particular broker advertises + * support for. Each broker advertises the version ranges of its own supported features in its + * own BrokerIdZNode. The contents of the advertisement are specific to the particular broker and + * do not represent any guarantee of a cluster-wide availability of the feature for any particular + * range of versions. + * + * 2. Finalized feature: + * A finalized feature is represented by a name (string) and a range of version levels (defined + * by a FinalizedVersionRange). Whenever the feature versioning system (KIP-584) is + * enabled, the finalized features are stored in the cluster-wide common FeatureZNode. + * In comparison to a supported feature, the key difference is that a finalized feature exists + * in ZK only when it is guaranteed to be supported by any random broker in the cluster for a + * specified range of version levels. Also, the controller is the only entity modifying the + * information about finalized features. + * + * This method sets up the FeatureZNode with enabled status, which means that the finalized + * features stored in the FeatureZNode are active. The enabled status should be written by the + * controller to the FeatureZNode only when the broker IBP config is greater than or equal to + * KAFKA_2_7_IV0. + * + * There are multiple cases handled here: + * + * 1. New cluster bootstrap: + * A new Kafka cluster (i.e. it is deployed first time) is almost always started with IBP config + * setting greater than or equal to KAFKA_2_7_IV0. We would like to start the cluster with all + * the possible supported features finalized immediately. Assuming this is the case, the + * controller will start up and notice that the FeatureZNode is absent in the new cluster, + * it will then create a FeatureZNode (with enabled status) containing the entire list of + * default supported features as its finalized features. + * + * 2. Broker binary upgraded, but IBP config set to lower than KAFKA_2_7_IV0: + * Imagine there is an existing Kafka cluster with IBP config less than KAFKA_2_7_IV0, and the + * broker binary has been upgraded to a newer version that supports the feature versioning + * system (KIP-584). This means the user is upgrading from an earlier version of the broker + * binary. In this case, we want to start with no finalized features and allow the user to + * finalize them whenever they are ready i.e. in the future whenever the user sets IBP config + * to be greater than or equal to KAFKA_2_7_IV0, then the user could start finalizing the + * features. This process ensures we do not enable all the possible features immediately after + * an upgrade, which could be harmful to Kafka. + * This is how we handle such a case: + * - Before the IBP config upgrade (i.e. IBP config set to less than KAFKA_2_7_IV0), the + * controller will start up and check if the FeatureZNode is absent. If absent, it will + * react by creating a FeatureZNode with disabled status and empty finalized features. + * Otherwise, if a node already exists in enabled status then the controller will just + * flip the status to disabled and clear the finalized features. + * - After the IBP config upgrade (i.e. IBP config set to greater than or equal to + * KAFKA_2_7_IV0), when the controller starts up it will check if the FeatureZNode exists + * and whether it is disabled. In such a case, it won’t upgrade all features immediately. + * Instead it will just switch the FeatureZNode status to enabled status. This lets the + * user finalize the features later. + * + * 3. Broker binary upgraded, with existing cluster IBP config >= KAFKA_2_7_IV0: + * Imagine an existing Kafka cluster with IBP config >= KAFKA_2_7_IV0, and the broker binary + * has just been upgraded to a newer version (that supports IBP config KAFKA_2_7_IV0 and higher). + * The controller will start up and find that a FeatureZNode is already present with enabled + * status and existing finalized features. In such a case, the controller needs to scan the + * existing finalized features and mutate them for the purpose of version level deprecation + * (if needed). + * This is how we handle this case: If an existing finalized feature is present in the default + * finalized features, then, its existing minimum version level is updated to the default + * minimum version level maintained in the BrokerFeatures object. The goal of this mutation is + * to permanently deprecate one or more feature version levels. The range of feature version + * levels deprecated are from the closed range: [existing_min_version_level, default_min_version_level]. + * NOTE: Deprecating a feature version level is an incompatible change, which requires a major + * release of Kafka. In such a release, the minimum version level maintained within the + * BrokerFeatures class is updated suitably to record the deprecation of the feature. + * + * 4. Broker downgrade: + * Imagine that a Kafka cluster exists already and the IBP config is greater than or equal to + * KAFKA_2_7_IV0. Then, the user decided to downgrade the cluster by setting IBP config to a + * value less than KAFKA_2_7_IV0. This means the user is also disabling the feature versioning + * system (KIP-584). In this case, when the controller starts up with the lower IBP config, it + * will switch the FeatureZNode status to disabled with empty features. + */ + private def enableFeatureVersioning(): Unit = { + val defaultFinalizedFeatures = brokerFeatures.defaultFinalizedFeatures + val (mayBeFeatureZNodeBytes, version) = zkClient.getDataAndVersion(FeatureZNode.path) + if (version == ZkVersion.UnknownVersion) { + val newVersion = createFeatureZNode(new FeatureZNode(FeatureZNodeStatus.Enabled, defaultFinalizedFeatures)) + featureCache.waitUntilEpochOrThrow(newVersion, config.zkConnectionTimeoutMs) + } else { + val existingFeatureZNode = FeatureZNode.decode(mayBeFeatureZNodeBytes.get) + var newFeatures: Features[FinalizedVersionRange] = Features.emptyFinalizedFeatures() + if (existingFeatureZNode.status.equals(FeatureZNodeStatus.Enabled)) { + newFeatures = Features.finalizedFeatures(existingFeatureZNode.features.features().asScala.map { + case (featureName, existingVersionRange) => + val brokerDefaultVersionRange = defaultFinalizedFeatures.get(featureName) + if (brokerDefaultVersionRange == null) { + warn(s"Existing finalized feature: $featureName with $existingVersionRange" + + s" is absent in default finalized $defaultFinalizedFeatures") + (featureName, existingVersionRange) + } else if (brokerDefaultVersionRange.max() >= existingVersionRange.max() && + brokerDefaultVersionRange.min() <= existingVersionRange.max()) { + // Using the change below, we deprecate all version levels in the range: + // [existingVersionRange.min(), brokerDefaultVersionRange.min() - 1]. + // + // NOTE: if existingVersionRange.min() equals brokerDefaultVersionRange.min(), then + // we do not deprecate any version levels (since there is none to be deprecated). + // + // Examples: + // 1. brokerDefaultVersionRange = [4, 7] and existingVersionRange = [1, 5]. + // In this case, we deprecate all version levels in the range: [1, 3]. + // 2. brokerDefaultVersionRange = [4, 7] and existingVersionRange = [4, 5]. + // In this case, we do not deprecate any version levels since + // brokerDefaultVersionRange.min() equals existingVersionRange.min(). + (featureName, new FinalizedVersionRange(brokerDefaultVersionRange.min(), existingVersionRange.max())) + } else { + // This is a serious error. We should never be reaching here, since we already + // verify once during KafkaServer startup that existing finalized feature versions in + // the FeatureZNode contained no incompatibilities. If we are here, it means that one + // of the following is true: + // 1. The existing version levels fall completely outside the range of the default + // finalized version levels (i.e. no intersection), or + // 2. The existing version levels are incompatible with default finalized version + // levels. + // + // Examples of invalid cases that can cause this exception to be triggered: + // 1. No intersection : brokerDefaultVersionRange = [4, 7] and existingVersionRange = [2, 3]. + // 2. No intersection : brokerDefaultVersionRange = [2, 3] and existingVersionRange = [4, 7]. + // 3. Incompatible versions: brokerDefaultVersionRange = [2, 3] and existingVersionRange = [1, 7]. + throw new IllegalStateException( + s"Can not update minimum version level in finalized feature: $featureName," + + s" since the existing $existingVersionRange is not eligible for a change" + + s" based on the default $brokerDefaultVersionRange. This should never happen" + + s" since feature version incompatibilities are already checked during" + + s" Kafka server startup.") + } + }.asJava) + } + val newFeatureZNode = new FeatureZNode(FeatureZNodeStatus.Enabled, newFeatures) + if (!newFeatureZNode.equals(existingFeatureZNode)) { + val newVersion = updateFeatureZNode(newFeatureZNode) + featureCache.waitUntilEpochOrThrow(newVersion, config.zkConnectionTimeoutMs) + } + } + } + + /** + * Disables the feature versioning system (KIP-584). + * + * Sets up the FeatureZNode with disabled status. This status means the feature versioning system + * (KIP-584) is disabled, and, the finalized features stored in the FeatureZNode are not relevant. + * This status should be written by the controller to the FeatureZNode only when the broker + * IBP config is less than KAFKA_2_7_IV0. + * + * NOTE: + * 1. When this method returns, existing finalized features (if any) will be cleared from the + * FeatureZNode. + * 2. This method, unlike enableFeatureVersioning() need not wait for the FinalizedFeatureCache + * to be updated, because, such updates to the cache (via FinalizedFeatureChangeListener) + * are disabled when IBP config is < than KAFKA_2_7_IV0. + */ + private def disableFeatureVersioning(): Unit = { + val newNode = FeatureZNode(FeatureZNodeStatus.Disabled, Features.emptyFinalizedFeatures()) + val (mayBeFeatureZNodeBytes, version) = zkClient.getDataAndVersion(FeatureZNode.path) + if (version == ZkVersion.UnknownVersion) { + createFeatureZNode(newNode) + } else { + val existingFeatureZNode = FeatureZNode.decode(mayBeFeatureZNodeBytes.get) + if (!existingFeatureZNode.status.equals(FeatureZNodeStatus.Disabled)) { + updateFeatureZNode(newNode) + } + } + } + + private def setupFeatureVersioning(): Unit = { Review comment: Done. ########## File path: core/src/main/scala/kafka/controller/KafkaController.scala ########## @@ -1656,6 +1893,203 @@ class KafkaController(val config: KafkaConfig, } } + /** + * Returns the new FinalizedVersionRange for the feature, if there are no feature + * incompatibilities seen with all known brokers for the provided feature update. + * Otherwise returns an ApiError object containing Errors.INVALID_REQUEST. + * + * @param update the feature update to be processed (this can not be meant to delete the feature) + * + * @return the new FinalizedVersionRange or error, as described above. + */ + private def newFinalizedVersionRangeOrIncompatibilityError(update: UpdateFeaturesRequestData.FeatureUpdateKey): Either[FinalizedVersionRange, ApiError] = { + if (UpdateFeaturesRequest.isDeleteRequest(update)) { + throw new IllegalArgumentException(s"Provided feature update can not be meant to delete the feature: $update") + } + + val supportedVersionRange = brokerFeatures.supportedFeatures.get(update.feature) + if (supportedVersionRange == null) { + Right(new ApiError(Errors.INVALID_REQUEST, + "Could not apply finalized feature update because the provided feature" + + " is not supported.")) + } else { + var newVersionRange: FinalizedVersionRange = null + try { + newVersionRange = new FinalizedVersionRange(supportedVersionRange.firstActiveVersion, update.maxVersionLevel) + } catch { + case _: IllegalArgumentException => { + // This exception means the provided maxVersionLevel is invalid. It is handled below + // outside of this catch clause. + } + } + if (newVersionRange == null) { + Right(new ApiError(Errors.INVALID_REQUEST, + "Could not apply finalized feature update because the provided" + + s" maxVersionLevel:${update.maxVersionLevel} is lower than the" + + s" first active version:${supportedVersionRange.firstActiveVersion}.")) + } else { + val newFinalizedFeature = + Features.finalizedFeatures(Utils.mkMap(Utils.mkEntry(update.feature, newVersionRange))) + val numIncompatibleBrokers = controllerContext.liveOrShuttingDownBrokers.count(broker => { + BrokerFeatures.hasIncompatibleFeatures(broker.features, newFinalizedFeature) + }) + if (numIncompatibleBrokers == 0) { + Left(newVersionRange) + } else { + Right(new ApiError(Errors.INVALID_REQUEST, + "Could not apply finalized feature update because" + + " brokers were found to have incompatible versions for the feature.")) + } + } + } + } + + /** + * Validates a feature update on an existing FinalizedVersionRange. + * If the validation succeeds, then, the return value contains: + * 1. the new FinalizedVersionRange for the feature, if the feature update was not meant to delete the feature. + * 2. Option.empty, if the feature update was meant to delete the feature. + * + * If the validation fails, then returned value contains a suitable ApiError. + * + * @param update the feature update to be processed. + * @param existingVersionRange the existing FinalizedVersionRange which can be empty when no + * FinalizedVersionRange exists for the associated feature + * + * @return the new FinalizedVersionRange to be updated into ZK or error + * as described above. + */ + private def validateFeatureUpdate(update: UpdateFeaturesRequestData.FeatureUpdateKey, + existingVersionRange: Option[FinalizedVersionRange]): Either[Option[FinalizedVersionRange], ApiError] = { + def newVersionRangeOrError(update: UpdateFeaturesRequestData.FeatureUpdateKey): Either[Option[FinalizedVersionRange], ApiError] = { + newFinalizedVersionRangeOrIncompatibilityError(update) + .fold(versionRange => Left(Some(versionRange)), error => Right(error)) + } + + if (update.feature.isEmpty) { + // Check that the feature name is not empty. + Right(new ApiError(Errors.INVALID_REQUEST, "Feature name can not be empty.")) + } else { + // We handle deletion requests separately from non-deletion requests. + if (UpdateFeaturesRequest.isDeleteRequest(update)) { + if (existingVersionRange.isEmpty) { + // Disallow deletion of a non-existing finalized feature. + Right(new ApiError(Errors.INVALID_REQUEST, + "Can not delete non-existing finalized feature.")) + } else { + Left(Option.empty) + } + } else if (update.maxVersionLevel() < 1) { + // Disallow deletion of a finalized feature without allowDowngrade flag set. + Right(new ApiError(Errors.INVALID_REQUEST, + s"Can not provide maxVersionLevel: ${update.maxVersionLevel} less" + + s" than 1 without setting the allowDowngrade flag to true in the request.")) + } else { + existingVersionRange.map(existing => + if (update.maxVersionLevel == existing.max) { + // Disallow a case where target maxVersionLevel matches existing maxVersionLevel. + Right(new ApiError(Errors.INVALID_REQUEST, + s"Can not ${if (update.allowDowngrade) "downgrade" else "upgrade"}" + + s" a finalized feature from existing maxVersionLevel:${existing.max}" + + " to the same value.")) + } else if (update.maxVersionLevel < existing.max && !update.allowDowngrade) { + // Disallow downgrade of a finalized feature without the allowDowngrade flag set. + Right(new ApiError(Errors.INVALID_REQUEST, + s"Can not downgrade finalized feature from existing" + + s" maxVersionLevel:${existing.max} to provided" + + s" maxVersionLevel:${update.maxVersionLevel} without setting the" + + " allowDowngrade flag in the request.")) + } else if (update.allowDowngrade && update.maxVersionLevel > existing.max) { + // Disallow a request that sets allowDowngrade flag without specifying a + // maxVersionLevel that's lower than the existing maxVersionLevel. + Right(new ApiError(Errors.INVALID_REQUEST, + s"When the allowDowngrade flag set in the request, the provided" + + s" maxVersionLevel:${update.maxVersionLevel} can not be greater than" + + s" existing maxVersionLevel:${existing.max}.")) + } else if (update.maxVersionLevel < existing.min) { + // Disallow downgrade of a finalized feature below the existing finalized + // minVersionLevel. + Right(new ApiError(Errors.INVALID_REQUEST, + s"Can not downgrade finalized feature to maxVersionLevel:${update.maxVersionLevel}" + + s" because it's lower than the existing minVersionLevel:${existing.min}.")) + } else { + newVersionRangeOrError(update) + } + ).getOrElse(newVersionRangeOrError(update)) + } + } + } + + private def processFeatureUpdates(request: UpdateFeaturesRequest, + callback: UpdateFeaturesCallback): Unit = { + if (isActive) { + processFeatureUpdatesWithActiveController(request, callback) + } else { + callback(Left(new ApiError(Errors.NOT_CONTROLLER))) + } + } + + private def processFeatureUpdatesWithActiveController(request: UpdateFeaturesRequest, + callback: UpdateFeaturesCallback): Unit = { + val updates = request.data.featureUpdates + val existingFeatures = featureCache.get + .map(featuresAndEpoch => featuresAndEpoch.features.features().asScala) + .getOrElse(Map[String, FinalizedVersionRange]()) + // A map with key being feature name and value being FinalizedVersionRange. + // This contains the target features to be eventually written to FeatureZNode. + val targetFeatures = scala.collection.mutable.Map[String, FinalizedVersionRange]() ++ existingFeatures + // A map with key being feature name and value being error encountered when the FeatureUpdate + // was applied. + val errors = scala.collection.mutable.Map[String, ApiError]() + + // Below we process each FeatureUpdate using the following logic: + // - If a FeatureUpdate is found to be valid, then: + // - The corresponding entry in errors map would be updated to contain ApiError(Errors.NONE). + // - If the FeatureUpdate is an add or update request, then the targetFeatures map is updated + // to contain the new FinalizedVersionRange for the feature. + // - Otherwise if the FeatureUpdate is a delete request, then the feature is removed from the + // targetFeatures map. + // - Otherwise if a FeatureUpdate is found to be invalid, then: + // - The corresponding entry in errors map would be updated with the appropriate ApiError. + // - The entry in targetFeatures map is left untouched. + updates.asScala.iterator.foreach { update => + validateFeatureUpdate(update, existingFeatures.get(update.feature())) match { + case Left(newVersionRangeOrNone) => + newVersionRangeOrNone + .map(newVersionRange => targetFeatures += (update.feature() -> newVersionRange)) Review comment: Done. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org