kowshik commented on a change in pull request #9001: URL: https://github.com/apache/kafka/pull/9001#discussion_r496525949
########## File path: core/src/main/scala/kafka/server/BrokerFeatures.scala ########## @@ -0,0 +1,179 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import kafka.utils.Logging +import org.apache.kafka.common.feature.{Features, FinalizedVersionRange, SupportedVersionRange} +import org.apache.kafka.common.feature.Features._ + +import scala.jdk.CollectionConverters._ + +/** + * A class that encapsulates the latest features supported by the Broker and also provides APIs to + * check for incompatibilities between the features supported by the Broker and finalized features. + * The class also enables feature version level deprecation, as explained below. This class is + * immutable in production. It provides few APIs to mutate state only for the purpose of testing. + * + * Feature version level deprecation: + * ================================== + * + * Deprecation of certain version levels of a feature is a process to stop supporting the + * functionality offered by the feature at a those version levels, across the entire Kafka cluster. + * Feature version deprecation is a simple 2-step process explained below. In each step below, an + * example is provided to help understand the process better: + * + * STEP 1: + * In the first step, a major Kafka release is made with a Broker code change (explained later + * below) that establishes the intent to deprecate certain versions of one or more features + * cluster-wide. When this new Kafka release is deployed to the cluster, the feature versioning + * system (via the controller) will automatically persist the new minVersionLevel for the feature in + * Zk to propagate the deprecation of certain versions. After this happens, any external client that + * queries the Broker to learn the feature versions will at some point start to see the new value + * for the finalized minVersionLevel for the feature. This makes the version deprecation permanent. + * + * Here is how the above code change needs to be done: + * In order to deprecate feature version levels, in the supportedFeatures map you need to supply a + * specific firstActiveVersion value that's higher than the minVersion for the feature. The + * value for firstActiveVersion should be 1 beyond the highest version that you intend to deprecate + * for that feature. When features are finalized via the ApiKeys.UPDATE_FEATURES api, the feature + * version levels in the closed range: [minVersion, firstActiveVersion - 1] are automatically + * deprecated in ZK by the controller logic. + * Example: + * - Let us assume the existing finalized feature in ZK: + * { + * "feature_1" -> FinalizedVersionRange(minVersionLevel=1, maxVersionLevel=5) + * } + * Now, supposing you would like to deprecate feature version levels: [1, 2]. + * Then, in the supportedFeatures map you should supply the following: + * supportedFeatures = { + * "feature1" -> SupportedVersionRange(minVersion=1, firstActiveVersion=3, maxVersion=5) + * } + * - If you do NOT want to deprecate a version level for a feature, then in the supportedFeatures + * map you should supply the firstActiveVersion to be the same as the minVersion supplied for that + * feature. + * Example: + * supportedFeatures = { + * "feature1" -> SupportedVersionRange(minVersion=1, firstActiveVersion=1, maxVersion=5) + * } + * This indicates no intent to deprecate any version levels for the feature. + * + * STEP 2: + * After the first step is over, you may (at some point) want to permanently remove the code/logic + * for the functionality offered by the deprecated feature versions. This is the second step. Here a + * subsequent major Kafka release is made with another Broker code change that removes the code for + * the functionality offered by the deprecated feature versions. This would completely drop support + * for the deprecated versions. Such a code change needs to be supplemented by supplying a + * suitable higher minVersion value for the feature in the supportedFeatures map. + * Example: + * - In the example above in step 1, we showed how to deprecate version levels [1, 2] for + * "feature_1". Now let us assume the following finalized feature in ZK (after the deprecation + * has been carried out): + * { + * "feature_1" -> FinalizedVersionRange(minVersionLevel=3, maxVersionLevel=5) + * } + * Now, supposing you would like to permanently remove support for feature versions: [1, 2]. + * Then, in the supportedFeatures map you should now supply the following: + * supportedFeatures = { + * "feature1" -> SupportedVersionRange(minVersion=3, firstActiveVersion=3, maxVersion=5) Review comment: This is a really good point. Yes, I feel it is useful to expose it to the client via `ApiVersionsResponse`. I can change the KIP suitably. ########## File path: core/src/main/scala/kafka/server/BrokerFeatures.scala ########## @@ -0,0 +1,179 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import kafka.utils.Logging +import org.apache.kafka.common.feature.{Features, FinalizedVersionRange, SupportedVersionRange} +import org.apache.kafka.common.feature.Features._ + +import scala.jdk.CollectionConverters._ + +/** + * A class that encapsulates the latest features supported by the Broker and also provides APIs to + * check for incompatibilities between the features supported by the Broker and finalized features. + * The class also enables feature version level deprecation, as explained below. This class is + * immutable in production. It provides few APIs to mutate state only for the purpose of testing. + * + * Feature version level deprecation: + * ================================== + * + * Deprecation of certain version levels of a feature is a process to stop supporting the + * functionality offered by the feature at a those version levels, across the entire Kafka cluster. + * Feature version deprecation is a simple 2-step process explained below. In each step below, an + * example is provided to help understand the process better: + * + * STEP 1: + * In the first step, a major Kafka release is made with a Broker code change (explained later + * below) that establishes the intent to deprecate certain versions of one or more features + * cluster-wide. When this new Kafka release is deployed to the cluster, the feature versioning + * system (via the controller) will automatically persist the new minVersionLevel for the feature in + * Zk to propagate the deprecation of certain versions. After this happens, any external client that + * queries the Broker to learn the feature versions will at some point start to see the new value + * for the finalized minVersionLevel for the feature. This makes the version deprecation permanent. + * + * Here is how the above code change needs to be done: + * In order to deprecate feature version levels, in the supportedFeatures map you need to supply a + * specific firstActiveVersion value that's higher than the minVersion for the feature. The + * value for firstActiveVersion should be 1 beyond the highest version that you intend to deprecate + * for that feature. When features are finalized via the ApiKeys.UPDATE_FEATURES api, the feature + * version levels in the closed range: [minVersion, firstActiveVersion - 1] are automatically + * deprecated in ZK by the controller logic. + * Example: + * - Let us assume the existing finalized feature in ZK: + * { + * "feature_1" -> FinalizedVersionRange(minVersionLevel=1, maxVersionLevel=5) + * } + * Now, supposing you would like to deprecate feature version levels: [1, 2]. + * Then, in the supportedFeatures map you should supply the following: + * supportedFeatures = { + * "feature1" -> SupportedVersionRange(minVersion=1, firstActiveVersion=3, maxVersion=5) + * } + * - If you do NOT want to deprecate a version level for a feature, then in the supportedFeatures + * map you should supply the firstActiveVersion to be the same as the minVersion supplied for that + * feature. + * Example: + * supportedFeatures = { + * "feature1" -> SupportedVersionRange(minVersion=1, firstActiveVersion=1, maxVersion=5) + * } + * This indicates no intent to deprecate any version levels for the feature. + * + * STEP 2: + * After the first step is over, you may (at some point) want to permanently remove the code/logic + * for the functionality offered by the deprecated feature versions. This is the second step. Here a + * subsequent major Kafka release is made with another Broker code change that removes the code for + * the functionality offered by the deprecated feature versions. This would completely drop support + * for the deprecated versions. Such a code change needs to be supplemented by supplying a + * suitable higher minVersion value for the feature in the supportedFeatures map. + * Example: + * - In the example above in step 1, we showed how to deprecate version levels [1, 2] for + * "feature_1". Now let us assume the following finalized feature in ZK (after the deprecation + * has been carried out): + * { + * "feature_1" -> FinalizedVersionRange(minVersionLevel=3, maxVersionLevel=5) + * } + * Now, supposing you would like to permanently remove support for feature versions: [1, 2]. + * Then, in the supportedFeatures map you should now supply the following: + * supportedFeatures = { + * "feature1" -> SupportedVersionRange(minVersion=3, firstActiveVersion=3, maxVersion=5) Review comment: This is a really good point. Yes, I feel it is useful to expose it to the client via `ApiVersionsResponse`. I can change the KIP suitably and then update the PR. ########## File path: core/src/main/scala/kafka/controller/KafkaController.scala ########## @@ -272,6 +281,199 @@ class KafkaController(val config: KafkaConfig, } } + private def createFeatureZNode(newNode: FeatureZNode): Int = { + info(s"Creating FeatureZNode at path: ${FeatureZNode.path} with contents: $newNode") + zkClient.createFeatureZNode(newNode) + val (_, newVersion) = zkClient.getDataAndVersion(FeatureZNode.path) + newVersion + } + + private def updateFeatureZNode(updatedNode: FeatureZNode): Int = { + info(s"Updating FeatureZNode at path: ${FeatureZNode.path} with contents: $updatedNode") + zkClient.updateFeatureZNode(updatedNode) + } + + /** + * This method enables the feature versioning system (KIP-584). + * + * Development in Kafka (from a high level) is organized into features. Each feature is tracked by + * a name and a range of version numbers. A feature can be of two types: + * + * 1. Supported feature: + * A supported feature is represented by a name (string) and a range of versions (defined by a + * SupportedVersionRange). It refers to a feature that a particular broker advertises + * support for. Each broker advertises the version ranges of its own supported features in its + * own BrokerIdZNode. The contents of the advertisement are specific to the particular broker and + * do not represent any guarantee of a cluster-wide availability of the feature for any particular + * range of versions. + * + * 2. Finalized feature: + * A finalized feature is represented by a name (string) and a range of version levels (defined + * by a FinalizedVersionRange). Whenever the feature versioning system (KIP-584) is + * enabled, the finalized features are stored in the cluster-wide common FeatureZNode. + * In comparison to a supported feature, the key difference is that a finalized feature exists + * in ZK only when it is guaranteed to be supported by any random broker in the cluster for a + * specified range of version levels. Also, the controller is the only entity modifying the + * information about finalized features. + * + * This method sets up the FeatureZNode with enabled status, which means that the finalized + * features stored in the FeatureZNode are active. The enabled status should be written by the + * controller to the FeatureZNode only when the broker IBP config is greater than or equal to + * KAFKA_2_7_IV0. + * + * There are multiple cases handled here: + * + * 1. New cluster bootstrap: + * A new Kafka cluster (i.e. it is deployed first time) is almost always started with IBP config + * setting greater than or equal to KAFKA_2_7_IV0. We would like to start the cluster with all + * the possible supported features finalized immediately. Assuming this is the case, the + * controller will start up and notice that the FeatureZNode is absent in the new cluster, + * it will then create a FeatureZNode (with enabled status) containing the entire list of + * default supported features as its finalized features. + * + * 2. Broker binary upgraded, but IBP config set to lower than KAFKA_2_7_IV0: + * Imagine there is an existing Kafka cluster with IBP config less than KAFKA_2_7_IV0, and the + * broker binary has been upgraded to a newer version that supports the feature versioning + * system (KIP-584). This means the user is upgrading from an earlier version of the broker + * binary. In this case, we want to start with no finalized features and allow the user to + * finalize them whenever they are ready i.e. in the future whenever the user sets IBP config + * to be greater than or equal to KAFKA_2_7_IV0, then the user could start finalizing the + * features. This process ensures we do not enable all the possible features immediately after + * an upgrade, which could be harmful to Kafka. + * This is how we handle such a case: + * - Before the IBP config upgrade (i.e. IBP config set to less than KAFKA_2_7_IV0), the + * controller will start up and check if the FeatureZNode is absent. If absent, it will + * react by creating a FeatureZNode with disabled status and empty finalized features. + * Otherwise, if a node already exists in enabled status then the controller will just + * flip the status to disabled and clear the finalized features. + * - After the IBP config upgrade (i.e. IBP config set to greater than or equal to + * KAFKA_2_7_IV0), when the controller starts up it will check if the FeatureZNode exists + * and whether it is disabled. In such a case, it won’t upgrade all features immediately. + * Instead it will just switch the FeatureZNode status to enabled status. This lets the + * user finalize the features later. + * + * 3. Broker binary upgraded, with existing cluster IBP config >= KAFKA_2_7_IV0: + * Imagine an existing Kafka cluster with IBP config >= KAFKA_2_7_IV0, and the broker binary + * has just been upgraded to a newer version (that supports IBP config KAFKA_2_7_IV0 and higher). + * The controller will start up and find that a FeatureZNode is already present with enabled + * status and existing finalized features. In such a case, the controller needs to scan the + * existing finalized features and mutate them for the purpose of version level deprecation + * (if needed). + * This is how we handle this case: If an existing finalized feature is present in the default + * finalized features, then, its existing minimum version level is updated to the default Review comment: Done. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org