junrao commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r496093216



##########
File path: clients/src/main/resources/common/message/ApiVersionsResponse.json
##########
@@ -55,8 +55,8 @@
           "about": "The maximum supported version for the feature." }
       ]
     },
-    {"name": "FinalizedFeaturesEpoch", "type": "int32", "versions": "3+",
-      "tag": 1, "taggedVersions": "3+", "default": "-1",
+    {"name": "FinalizedFeaturesEpoch", "type": "int64", "versions": "3+",

Review comment:
       Space before "name".

##########
File path: clients/src/main/resources/common/message/UpdateFeaturesRequest.json
##########
@@ -0,0 +1,35 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 57,
+  "type": "request",
+  "name": "UpdateFeaturesRequest",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  "fields": [
+    { "name": "timeoutMs", "type": "int32", "versions": "0+", "default": 
"60000",

Review comment:
       This is not included in the KIP. Should we update the KIP?

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/FinalizedVersionRange.java
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import java.util.Objects;
+
+/**
+ * Represents a range of version levels supported by every broker in a cluster 
for some feature.
+ */
+public class FinalizedVersionRange {
+    private final short minVersionLevel;
+
+    private final short maxVersionLevel;
+
+    /**
+     * Raises an exception unless the following condition is met:
+     * minVersionLevel >= 1 and maxVersionLevel >= 1 and maxVersionLevel >= 
minVersionLevel.
+     *
+     * @param minVersionLevel   The minimum version level value.
+     * @param maxVersionLevel   The maximum version level value.
+     *
+     * @throws IllegalArgumentException   Raised when the condition described 
above is not met.
+     */
+    public FinalizedVersionRange(final short minVersionLevel, final short 
maxVersionLevel) {

Review comment:
       Since the user is not expected to instantiate this, should we make the 
constructor non-public?

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/FeatureMetadata.java
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import static java.util.stream.Collectors.joining;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * Encapsulates details about finalized as well as supported features. This is 
particularly useful
+ * to hold the result returned by the {@link 
Admin#describeFeatures(DescribeFeaturesOptions)} API.
+ */
+public class FeatureMetadata {
+
+    private final Map<String, FinalizedVersionRange> finalizedFeatures;
+
+    private final Optional<Long> finalizedFeaturesEpoch;
+
+    private final Map<String, SupportedVersionRange> supportedFeatures;
+
+    public FeatureMetadata(final Map<String, FinalizedVersionRange> 
finalizedFeatures,

Review comment:
       Since the user is not expected to instantiate this, should we make the 
constructor non-public?

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/DescribeFeaturesOptions.java
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+/**
+ * Options for {@link AdminClient#describeFeatures(DescribeFeaturesOptions)}.
+ *
+ * The API of this class is evolving. See {@link Admin} for details.
+ */
+@InterfaceStability.Evolving
+public class DescribeFeaturesOptions extends 
AbstractOptions<DescribeFeaturesOptions> {

Review comment:
       Since this is public facing, could we include the description in the KIP?

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/UpdateFeaturesOptions.java
##########
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import java.util.Map;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+/**
+ * Options for {@link AdminClient#updateFeatures(Map, UpdateFeaturesOptions)}.
+ *
+ * The API of this class is evolving. See {@link Admin} for details.
+ */
+@InterfaceStability.Evolving
+public class UpdateFeaturesOptions extends 
AbstractOptions<UpdateFeaturesOptions> {

Review comment:
       Are we adding the timeout option based on the KIP discussion?

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/FinalizedVersionRange.java
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import java.util.Objects;
+
+/**
+ * Represents a range of version levels supported by every broker in a cluster 
for some feature.
+ */
+public class FinalizedVersionRange {

Review comment:
       This seems identical to SupportedVersionRange. Should we just have one, 
sth like VersionRange?

##########
File path: core/src/main/scala/kafka/server/BrokerFeatures.scala
##########
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.utils.Logging
+import org.apache.kafka.common.feature.{Features, FinalizedVersionRange, 
SupportedVersionRange}
+import org.apache.kafka.common.feature.Features._
+
+import scala.jdk.CollectionConverters._
+
+/**
+ * A class that encapsulates the latest features supported by the Broker and 
also provides APIs to
+ * check for incompatibilities between the features supported by the Broker 
and finalized features.
+ * The class also enables feature version level deprecation, as explained 
below. This class is
+ * immutable in production. It provides few APIs to mutate state only for the 
purpose of testing.
+ *
+ * Feature version level deprecation:
+ * ==================================
+ *
+ * Deprecation of certain version levels of a feature is a process to stop 
supporting the
+ * functionality offered by the feature at a those version levels, across the 
entire Kafka cluster.
+ * Feature version deprecation is a simple 2-step process explained below. In 
each step below, an
+ * example is provided to help understand the process better:
+ *
+ * STEP 1:

Review comment:
       I guess after the first step, deprecated finalized versions are no 
longer advertised to the client, but they can still be used by existing 
connections?

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1733,8 +1734,8 @@ class KafkaApis(val requestChannel: RequestChannel,
       else if (!apiVersionRequest.isValid)
         apiVersionRequest.getErrorResponse(requestThrottleMs, 
Errors.INVALID_REQUEST.exception)
       else {
-        val supportedFeatures = SupportedFeatures.get
-        val finalizedFeatures = FinalizedFeatureCache.get
+        val supportedFeatures = brokerFeatures.supportedFeatures
+        val finalizedFeatures = featureCache.get

Review comment:
       Perhaps it's better for the following code to use match instead if/else.

##########
File path: core/src/main/scala/kafka/server/BrokerFeatures.scala
##########
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.utils.Logging
+import org.apache.kafka.common.feature.{Features, FinalizedVersionRange, 
SupportedVersionRange}
+import org.apache.kafka.common.feature.Features._
+
+import scala.jdk.CollectionConverters._
+
+/**
+ * A class that encapsulates the latest features supported by the Broker and 
also provides APIs to
+ * check for incompatibilities between the features supported by the Broker 
and finalized features.
+ * The class also enables feature version level deprecation, as explained 
below. This class is
+ * immutable in production. It provides few APIs to mutate state only for the 
purpose of testing.
+ *
+ * Feature version level deprecation:
+ * ==================================
+ *
+ * Deprecation of certain version levels of a feature is a process to stop 
supporting the
+ * functionality offered by the feature at a those version levels, across the 
entire Kafka cluster.

Review comment:
       "at a those " typo?

##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -272,6 +281,199 @@ class KafkaController(val config: KafkaConfig,
     }
   }
 
+  private def createFeatureZNode(newNode: FeatureZNode): Int = {
+    info(s"Creating FeatureZNode at path: ${FeatureZNode.path} with contents: 
$newNode")
+    zkClient.createFeatureZNode(newNode)
+    val (_, newVersion) = zkClient.getDataAndVersion(FeatureZNode.path)
+    newVersion
+  }
+
+  private def updateFeatureZNode(updatedNode: FeatureZNode): Int = {
+    info(s"Updating FeatureZNode at path: ${FeatureZNode.path} with contents: 
$updatedNode")
+    zkClient.updateFeatureZNode(updatedNode)
+  }
+
+  /**
+   * This method enables the feature versioning system (KIP-584).
+   *
+   * Development in Kafka (from a high level) is organized into features. Each 
feature is tracked by
+   * a name and a range of version numbers. A feature can be of two types:
+   *
+   * 1. Supported feature:
+   * A supported feature is represented by a name (string) and a range of 
versions (defined by a
+   * SupportedVersionRange). It refers to a feature that a particular broker 
advertises
+   * support for. Each broker advertises the version ranges of its own 
supported features in its
+   * own BrokerIdZNode. The contents of the advertisement are specific to the 
particular broker and
+   * do not represent any guarantee of a cluster-wide availability of the 
feature for any particular
+   * range of versions.
+   *
+   * 2. Finalized feature:
+   * A finalized feature is represented by a name (string) and a range of 
version levels (defined
+   * by a FinalizedVersionRange). Whenever the feature versioning system 
(KIP-584) is
+   * enabled, the finalized features are stored in the cluster-wide common 
FeatureZNode.
+   * In comparison to a supported feature, the key difference is that a 
finalized feature exists
+   * in ZK only when it is guaranteed to be supported by any random broker in 
the cluster for a
+   * specified range of version levels. Also, the controller is the only 
entity modifying the
+   * information about finalized features.
+   *
+   * This method sets up the FeatureZNode with enabled status, which means 
that the finalized
+   * features stored in the FeatureZNode are active. The enabled status should 
be written by the
+   * controller to the FeatureZNode only when the broker IBP config is greater 
than or equal to
+   * KAFKA_2_7_IV0.
+   *
+   * There are multiple cases handled here:
+   *
+   * 1. New cluster bootstrap:
+   *    A new Kafka cluster (i.e. it is deployed first time) is almost always 
started with IBP config
+   *    setting greater than or equal to KAFKA_2_7_IV0. We would like to start 
the cluster with all
+   *    the possible supported features finalized immediately. Assuming this 
is the case, the
+   *    controller will start up and notice that the FeatureZNode is absent in 
the new cluster,
+   *    it will then create a FeatureZNode (with enabled status) containing 
the entire list of
+   *    default supported features as its finalized features.
+   *
+   * 2. Broker binary upgraded, but IBP config set to lower than KAFKA_2_7_IV0:
+   *    Imagine there is an existing Kafka cluster with IBP config less than 
KAFKA_2_7_IV0, and the
+   *    broker binary has been upgraded to a newer version that supports the 
feature versioning
+   *    system (KIP-584). This means the user is upgrading from an earlier 
version of the broker
+   *    binary. In this case, we want to start with no finalized features and 
allow the user to
+   *    finalize them whenever they are ready i.e. in the future whenever the 
user sets IBP config
+   *    to be greater than or equal to KAFKA_2_7_IV0, then the user could 
start finalizing the
+   *    features. This process ensures we do not enable all the possible 
features immediately after
+   *    an upgrade, which could be harmful to Kafka.
+   *    This is how we handle such a case:
+   *      - Before the IBP config upgrade (i.e. IBP config set to less than 
KAFKA_2_7_IV0), the
+   *        controller will start up and check if the FeatureZNode is absent. 
If absent, it will
+   *        react by creating a FeatureZNode with disabled status and empty 
finalized features.
+   *        Otherwise, if a node already exists in enabled status then the 
controller will just
+   *        flip the status to disabled and clear the finalized features.
+   *      - After the IBP config upgrade (i.e. IBP config set to greater than 
or equal to
+   *        KAFKA_2_7_IV0), when the controller starts up it will check if the 
FeatureZNode exists
+   *        and whether it is disabled. In such a case, it won’t upgrade all 
features immediately.
+   *        Instead it will just switch the FeatureZNode status to enabled 
status. This lets the
+   *        user finalize the features later.
+   *
+   * 3. Broker binary upgraded, with existing cluster IBP config >= 
KAFKA_2_7_IV0:
+   *    Imagine an existing Kafka cluster with IBP config >= KAFKA_2_7_IV0, 
and the broker binary
+   *    has just been upgraded to a newer version (that supports IBP config 
KAFKA_2_7_IV0 and higher).
+   *    The controller will start up and find that a FeatureZNode is already 
present with enabled
+   *    status and existing finalized features. In such a case, the controller 
needs to scan the
+   *    existing finalized features and mutate them for the purpose of version 
level deprecation
+   *    (if needed).
+   *    This is how we handle this case: If an existing finalized feature is 
present in the default
+   *    finalized features, then, its existing minimum version level is 
updated to the default
+   *    minimum version level maintained in the BrokerFeatures object. The 
goal of this mutation is
+   *    to permanently deprecate one or more feature version levels. The range 
of feature version
+   *    levels deprecated are from the closed range: 
[existing_min_version_level, default_min_version_level].
+   *    NOTE: Deprecating a feature version level is an incompatible change, 
which requires a major
+   *    release of Kafka. In such a release, the minimum version level 
maintained within the
+   *    BrokerFeatures class is updated suitably to record the deprecation of 
the feature.
+   *
+   * 4. Broker downgrade:
+   *    Imagine that a Kafka cluster exists already and the IBP config is 
greater than or equal to
+   *    KAFKA_2_7_IV0. Then, the user decided to downgrade the cluster by 
setting IBP config to a
+   *    value less than KAFKA_2_7_IV0. This means the user is also disabling 
the feature versioning
+   *    system (KIP-584). In this case, when the controller starts up with the 
lower IBP config, it
+   *    will switch the FeatureZNode status to disabled with empty features.
+   */
+  private def enableFeatureVersioning(): Unit = {
+    val defaultFinalizedFeatures = brokerFeatures.defaultFinalizedFeatures
+    val (mayBeFeatureZNodeBytes, version) = 
zkClient.getDataAndVersion(FeatureZNode.path)
+    if (version == ZkVersion.UnknownVersion) {
+      val newVersion = createFeatureZNode(new 
FeatureZNode(FeatureZNodeStatus.Enabled, defaultFinalizedFeatures))
+      featureCache.waitUntilEpochOrThrow(newVersion, 
config.zkConnectionTimeoutMs)
+    } else {
+      val existingFeatureZNode = 
FeatureZNode.decode(mayBeFeatureZNodeBytes.get)
+      var newFeatures: Features[FinalizedVersionRange] = 
Features.emptyFinalizedFeatures()
+      if (existingFeatureZNode.status.equals(FeatureZNodeStatus.Enabled)) {
+        newFeatures = 
Features.finalizedFeatures(existingFeatureZNode.features.features().asScala.map 
{
+          case (featureName, existingVersionRange) =>
+            val brokerDefaultVersionRange = 
defaultFinalizedFeatures.get(featureName)
+            if (brokerDefaultVersionRange == null) {
+              warn(s"Existing finalized feature: $featureName with 
$existingVersionRange"
+                + s" is absent in default finalized $defaultFinalizedFeatures")
+              (featureName, existingVersionRange)
+            } else if (brokerDefaultVersionRange.max() >= 
existingVersionRange.max() &&
+                       brokerDefaultVersionRange.min() <= 
existingVersionRange.max()) {
+              // Using the change below, we deprecate all version levels in 
the range:
+              // [existingVersionRange.min(), brokerDefaultVersionRange.min() 
- 1].
+              //
+              // NOTE: if existingVersionRange.min() equals 
brokerDefaultVersionRange.min(), then
+              // we do not deprecate any version levels (since there is none 
to be deprecated).
+              //
+              // Examples:
+              // 1. brokerDefaultVersionRange = [4, 7] and 
existingVersionRange = [1, 5].
+              //    In this case, we deprecate all version levels in the 
range: [1, 3].
+              // 2. brokerDefaultVersionRange = [4, 7] and 
existingVersionRange = [4, 5].
+              //    In this case, we do not deprecate any version levels since
+              //    brokerDefaultVersionRange.min() equals 
existingVersionRange.min().
+              (featureName, new 
FinalizedVersionRange(brokerDefaultVersionRange.min(), 
existingVersionRange.max()))
+            } else {
+              // This is a serious error. We should never be reaching here, 
since we already
+              // verify once during KafkaServer startup that existing 
finalized feature versions in
+              // the FeatureZNode contained no incompatibilities. If we are 
here, it means that one
+              // of the following is true:
+              // 1. The existing version levels fall completely outside the 
range of the default
+              // finalized version levels (i.e. no intersection), or
+              // 2. The existing version levels are incompatible with default 
finalized version
+              // levels.
+              //
+              // Examples of invalid cases that can cause this exception to be 
triggered:
+              // 1. No intersection      : brokerDefaultVersionRange = [4, 7] 
and existingVersionRange = [2, 3].
+              // 2. No intersection      : brokerDefaultVersionRange = [2, 3] 
and existingVersionRange = [4, 7].
+              // 3. Incompatible versions: brokerDefaultVersionRange = [2, 3] 
and existingVersionRange = [1, 7].
+              throw new IllegalStateException(
+                s"Can not update minimum version level in finalized feature: 
$featureName,"
+                + s" since the existing $existingVersionRange is not eligible 
for a change"
+                + s" based on the default $brokerDefaultVersionRange. This 
should never happen"
+                + s" since feature version incompatibilities are already 
checked during"
+                + s" Kafka server startup.")
+            }
+        }.asJava)
+      }
+      val newFeatureZNode = new FeatureZNode(FeatureZNodeStatus.Enabled, 
newFeatures)
+      if (!newFeatureZNode.equals(existingFeatureZNode)) {
+        val newVersion = updateFeatureZNode(newFeatureZNode)
+        featureCache.waitUntilEpochOrThrow(newVersion, 
config.zkConnectionTimeoutMs)
+      }
+    }
+  }
+
+  /**
+   * Disables the feature versioning system (KIP-584).
+   *
+   * Sets up the FeatureZNode with disabled status. This status means the 
feature versioning system
+   * (KIP-584) is disabled, and, the finalized features stored in the 
FeatureZNode are not relevant.
+   * This status should be written by the controller to the FeatureZNode only 
when the broker
+   * IBP config is less than KAFKA_2_7_IV0.
+   *
+   * NOTE:
+   * 1. When this method returns, existing finalized features (if any) will be 
cleared from the
+   *    FeatureZNode.
+   * 2. This method, unlike enableFeatureVersioning() need not wait for the 
FinalizedFeatureCache
+   *    to be updated, because, such updates to the cache (via 
FinalizedFeatureChangeListener)
+   *    are disabled when IBP config is < than KAFKA_2_7_IV0.
+   */
+  private def disableFeatureVersioning(): Unit = {
+    val newNode = FeatureZNode(FeatureZNodeStatus.Disabled, 
Features.emptyFinalizedFeatures())
+    val (mayBeFeatureZNodeBytes, version) = 
zkClient.getDataAndVersion(FeatureZNode.path)
+    if (version == ZkVersion.UnknownVersion) {
+      createFeatureZNode(newNode)
+    } else {
+      val existingFeatureZNode = 
FeatureZNode.decode(mayBeFeatureZNodeBytes.get)
+      if (!existingFeatureZNode.status.equals(FeatureZNodeStatus.Disabled)) {
+        updateFeatureZNode(newNode)
+      }
+    }
+  }
+
+  private def setupFeatureVersioning(): Unit = {

Review comment:
       setupFeatureVersioning => mayBeSetupFeatureVersioning ?

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -3109,6 +3110,37 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
+  def handleUpdateFeatures(request: RequestChannel.Request): Unit = {
+    val updateFeaturesRequest = request.body[UpdateFeaturesRequest]
+
+    def sendResponseCallback(errors: Either[ApiError, Map[String, ApiError]]): 
Unit = {
+      def createResponse(throttleTimeMs: Int): UpdateFeaturesResponse = {
+        errors match {
+          case Left(topLevelError) => {
+            val featureUpdateNoErrors = updateFeaturesRequest
+              .data().featureUpdates().asScala

Review comment:
       I think the convention is that if there is a top level error, the second 
level will just be empty since there is not need to process them individually.

##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/FeatureUpdate.java
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;

Review comment:
       This package is not part of the javadoc and thus is not part of the 
public interface.

##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
##########
@@ -43,7 +43,7 @@
  */
 public class ApiVersionsResponse extends AbstractResponse {
 
-    public static final int UNKNOWN_FINALIZED_FEATURES_EPOCH = -1;
+    public static final long UNKNOWN_FINALIZED_FEATURES_EPOCH = -1;

Review comment:
       -1 =>  -1L?

##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -272,6 +281,199 @@ class KafkaController(val config: KafkaConfig,
     }
   }
 
+  private def createFeatureZNode(newNode: FeatureZNode): Int = {
+    info(s"Creating FeatureZNode at path: ${FeatureZNode.path} with contents: 
$newNode")
+    zkClient.createFeatureZNode(newNode)
+    val (_, newVersion) = zkClient.getDataAndVersion(FeatureZNode.path)
+    newVersion
+  }
+
+  private def updateFeatureZNode(updatedNode: FeatureZNode): Int = {
+    info(s"Updating FeatureZNode at path: ${FeatureZNode.path} with contents: 
$updatedNode")
+    zkClient.updateFeatureZNode(updatedNode)
+  }
+
+  /**
+   * This method enables the feature versioning system (KIP-584).
+   *
+   * Development in Kafka (from a high level) is organized into features. Each 
feature is tracked by
+   * a name and a range of version numbers. A feature can be of two types:
+   *
+   * 1. Supported feature:
+   * A supported feature is represented by a name (string) and a range of 
versions (defined by a
+   * SupportedVersionRange). It refers to a feature that a particular broker 
advertises
+   * support for. Each broker advertises the version ranges of its own 
supported features in its
+   * own BrokerIdZNode. The contents of the advertisement are specific to the 
particular broker and
+   * do not represent any guarantee of a cluster-wide availability of the 
feature for any particular
+   * range of versions.
+   *
+   * 2. Finalized feature:
+   * A finalized feature is represented by a name (string) and a range of 
version levels (defined
+   * by a FinalizedVersionRange). Whenever the feature versioning system 
(KIP-584) is
+   * enabled, the finalized features are stored in the cluster-wide common 
FeatureZNode.
+   * In comparison to a supported feature, the key difference is that a 
finalized feature exists
+   * in ZK only when it is guaranteed to be supported by any random broker in 
the cluster for a
+   * specified range of version levels. Also, the controller is the only 
entity modifying the
+   * information about finalized features.
+   *
+   * This method sets up the FeatureZNode with enabled status, which means 
that the finalized
+   * features stored in the FeatureZNode are active. The enabled status should 
be written by the
+   * controller to the FeatureZNode only when the broker IBP config is greater 
than or equal to
+   * KAFKA_2_7_IV0.
+   *
+   * There are multiple cases handled here:
+   *
+   * 1. New cluster bootstrap:
+   *    A new Kafka cluster (i.e. it is deployed first time) is almost always 
started with IBP config
+   *    setting greater than or equal to KAFKA_2_7_IV0. We would like to start 
the cluster with all
+   *    the possible supported features finalized immediately. Assuming this 
is the case, the
+   *    controller will start up and notice that the FeatureZNode is absent in 
the new cluster,
+   *    it will then create a FeatureZNode (with enabled status) containing 
the entire list of
+   *    default supported features as its finalized features.
+   *
+   * 2. Broker binary upgraded, but IBP config set to lower than KAFKA_2_7_IV0:
+   *    Imagine there is an existing Kafka cluster with IBP config less than 
KAFKA_2_7_IV0, and the
+   *    broker binary has been upgraded to a newer version that supports the 
feature versioning
+   *    system (KIP-584). This means the user is upgrading from an earlier 
version of the broker
+   *    binary. In this case, we want to start with no finalized features and 
allow the user to
+   *    finalize them whenever they are ready i.e. in the future whenever the 
user sets IBP config
+   *    to be greater than or equal to KAFKA_2_7_IV0, then the user could 
start finalizing the
+   *    features. This process ensures we do not enable all the possible 
features immediately after
+   *    an upgrade, which could be harmful to Kafka.
+   *    This is how we handle such a case:
+   *      - Before the IBP config upgrade (i.e. IBP config set to less than 
KAFKA_2_7_IV0), the
+   *        controller will start up and check if the FeatureZNode is absent. 
If absent, it will
+   *        react by creating a FeatureZNode with disabled status and empty 
finalized features.
+   *        Otherwise, if a node already exists in enabled status then the 
controller will just
+   *        flip the status to disabled and clear the finalized features.
+   *      - After the IBP config upgrade (i.e. IBP config set to greater than 
or equal to
+   *        KAFKA_2_7_IV0), when the controller starts up it will check if the 
FeatureZNode exists
+   *        and whether it is disabled. In such a case, it won’t upgrade all 
features immediately.
+   *        Instead it will just switch the FeatureZNode status to enabled 
status. This lets the
+   *        user finalize the features later.
+   *
+   * 3. Broker binary upgraded, with existing cluster IBP config >= 
KAFKA_2_7_IV0:
+   *    Imagine an existing Kafka cluster with IBP config >= KAFKA_2_7_IV0, 
and the broker binary
+   *    has just been upgraded to a newer version (that supports IBP config 
KAFKA_2_7_IV0 and higher).
+   *    The controller will start up and find that a FeatureZNode is already 
present with enabled
+   *    status and existing finalized features. In such a case, the controller 
needs to scan the
+   *    existing finalized features and mutate them for the purpose of version 
level deprecation
+   *    (if needed).
+   *    This is how we handle this case: If an existing finalized feature is 
present in the default
+   *    finalized features, then, its existing minimum version level is 
updated to the default
+   *    minimum version level maintained in the BrokerFeatures object. The 
goal of this mutation is
+   *    to permanently deprecate one or more feature version levels. The range 
of feature version
+   *    levels deprecated are from the closed range: 
[existing_min_version_level, default_min_version_level].
+   *    NOTE: Deprecating a feature version level is an incompatible change, 
which requires a major
+   *    release of Kafka. In such a release, the minimum version level 
maintained within the
+   *    BrokerFeatures class is updated suitably to record the deprecation of 
the feature.
+   *
+   * 4. Broker downgrade:
+   *    Imagine that a Kafka cluster exists already and the IBP config is 
greater than or equal to
+   *    KAFKA_2_7_IV0. Then, the user decided to downgrade the cluster by 
setting IBP config to a
+   *    value less than KAFKA_2_7_IV0. This means the user is also disabling 
the feature versioning
+   *    system (KIP-584). In this case, when the controller starts up with the 
lower IBP config, it
+   *    will switch the FeatureZNode status to disabled with empty features.
+   */
+  private def enableFeatureVersioning(): Unit = {
+    val defaultFinalizedFeatures = brokerFeatures.defaultFinalizedFeatures
+    val (mayBeFeatureZNodeBytes, version) = 
zkClient.getDataAndVersion(FeatureZNode.path)
+    if (version == ZkVersion.UnknownVersion) {
+      val newVersion = createFeatureZNode(new 
FeatureZNode(FeatureZNodeStatus.Enabled, defaultFinalizedFeatures))
+      featureCache.waitUntilEpochOrThrow(newVersion, 
config.zkConnectionTimeoutMs)
+    } else {
+      val existingFeatureZNode = 
FeatureZNode.decode(mayBeFeatureZNodeBytes.get)
+      var newFeatures: Features[FinalizedVersionRange] = 
Features.emptyFinalizedFeatures()
+      if (existingFeatureZNode.status.equals(FeatureZNodeStatus.Enabled)) {
+        newFeatures = 
Features.finalizedFeatures(existingFeatureZNode.features.features().asScala.map 
{
+          case (featureName, existingVersionRange) =>
+            val brokerDefaultVersionRange = 
defaultFinalizedFeatures.get(featureName)
+            if (brokerDefaultVersionRange == null) {
+              warn(s"Existing finalized feature: $featureName with 
$existingVersionRange"
+                + s" is absent in default finalized $defaultFinalizedFeatures")
+              (featureName, existingVersionRange)
+            } else if (brokerDefaultVersionRange.max() >= 
existingVersionRange.max() &&
+                       brokerDefaultVersionRange.min() <= 
existingVersionRange.max()) {
+              // Using the change below, we deprecate all version levels in 
the range:
+              // [existingVersionRange.min(), brokerDefaultVersionRange.min() 
- 1].
+              //
+              // NOTE: if existingVersionRange.min() equals 
brokerDefaultVersionRange.min(), then
+              // we do not deprecate any version levels (since there is none 
to be deprecated).
+              //
+              // Examples:
+              // 1. brokerDefaultVersionRange = [4, 7] and 
existingVersionRange = [1, 5].
+              //    In this case, we deprecate all version levels in the 
range: [1, 3].
+              // 2. brokerDefaultVersionRange = [4, 7] and 
existingVersionRange = [4, 5].
+              //    In this case, we do not deprecate any version levels since
+              //    brokerDefaultVersionRange.min() equals 
existingVersionRange.min().
+              (featureName, new 
FinalizedVersionRange(brokerDefaultVersionRange.min(), 
existingVersionRange.max()))
+            } else {
+              // This is a serious error. We should never be reaching here, 
since we already
+              // verify once during KafkaServer startup that existing 
finalized feature versions in
+              // the FeatureZNode contained no incompatibilities. If we are 
here, it means that one
+              // of the following is true:
+              // 1. The existing version levels fall completely outside the 
range of the default
+              // finalized version levels (i.e. no intersection), or
+              // 2. The existing version levels are incompatible with default 
finalized version
+              // levels.
+              //
+              // Examples of invalid cases that can cause this exception to be 
triggered:
+              // 1. No intersection      : brokerDefaultVersionRange = [4, 7] 
and existingVersionRange = [2, 3].
+              // 2. No intersection      : brokerDefaultVersionRange = [2, 3] 
and existingVersionRange = [4, 7].
+              // 3. Incompatible versions: brokerDefaultVersionRange = [2, 3] 
and existingVersionRange = [1, 7].
+              throw new IllegalStateException(
+                s"Can not update minimum version level in finalized feature: 
$featureName,"
+                + s" since the existing $existingVersionRange is not eligible 
for a change"
+                + s" based on the default $brokerDefaultVersionRange. This 
should never happen"
+                + s" since feature version incompatibilities are already 
checked during"
+                + s" Kafka server startup.")
+            }
+        }.asJava)
+      }
+      val newFeatureZNode = new FeatureZNode(FeatureZNodeStatus.Enabled, 
newFeatures)
+      if (!newFeatureZNode.equals(existingFeatureZNode)) {
+        val newVersion = updateFeatureZNode(newFeatureZNode)
+        featureCache.waitUntilEpochOrThrow(newVersion, 
config.zkConnectionTimeoutMs)
+      }
+    }
+  }
+
+  /**
+   * Disables the feature versioning system (KIP-584).
+   *
+   * Sets up the FeatureZNode with disabled status. This status means the 
feature versioning system
+   * (KIP-584) is disabled, and, the finalized features stored in the 
FeatureZNode are not relevant.
+   * This status should be written by the controller to the FeatureZNode only 
when the broker
+   * IBP config is less than KAFKA_2_7_IV0.
+   *
+   * NOTE:
+   * 1. When this method returns, existing finalized features (if any) will be 
cleared from the
+   *    FeatureZNode.
+   * 2. This method, unlike enableFeatureVersioning() need not wait for the 
FinalizedFeatureCache
+   *    to be updated, because, such updates to the cache (via 
FinalizedFeatureChangeListener)
+   *    are disabled when IBP config is < than KAFKA_2_7_IV0.
+   */
+  private def disableFeatureVersioning(): Unit = {
+    val newNode = FeatureZNode(FeatureZNodeStatus.Disabled, 
Features.emptyFinalizedFeatures())
+    val (mayBeFeatureZNodeBytes, version) = 
zkClient.getDataAndVersion(FeatureZNode.path)
+    if (version == ZkVersion.UnknownVersion) {
+      createFeatureZNode(newNode)
+    } else {
+      val existingFeatureZNode = 
FeatureZNode.decode(mayBeFeatureZNodeBytes.get)
+      if (!existingFeatureZNode.status.equals(FeatureZNodeStatus.Disabled)) {
+        updateFeatureZNode(newNode)
+      }
+    }
+  }
+
+  private def setupFeatureVersioning(): Unit = {
+    if (config.isFeatureVersioningEnabled) {

Review comment:
       Perhaps isFeatureVersioningSupported is a better name?

##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -272,6 +281,199 @@ class KafkaController(val config: KafkaConfig,
     }
   }
 
+  private def createFeatureZNode(newNode: FeatureZNode): Int = {
+    info(s"Creating FeatureZNode at path: ${FeatureZNode.path} with contents: 
$newNode")
+    zkClient.createFeatureZNode(newNode)
+    val (_, newVersion) = zkClient.getDataAndVersion(FeatureZNode.path)
+    newVersion
+  }
+
+  private def updateFeatureZNode(updatedNode: FeatureZNode): Int = {
+    info(s"Updating FeatureZNode at path: ${FeatureZNode.path} with contents: 
$updatedNode")
+    zkClient.updateFeatureZNode(updatedNode)
+  }
+
+  /**
+   * This method enables the feature versioning system (KIP-584).
+   *
+   * Development in Kafka (from a high level) is organized into features. Each 
feature is tracked by
+   * a name and a range of version numbers. A feature can be of two types:
+   *
+   * 1. Supported feature:
+   * A supported feature is represented by a name (string) and a range of 
versions (defined by a
+   * SupportedVersionRange). It refers to a feature that a particular broker 
advertises
+   * support for. Each broker advertises the version ranges of its own 
supported features in its
+   * own BrokerIdZNode. The contents of the advertisement are specific to the 
particular broker and
+   * do not represent any guarantee of a cluster-wide availability of the 
feature for any particular
+   * range of versions.
+   *
+   * 2. Finalized feature:
+   * A finalized feature is represented by a name (string) and a range of 
version levels (defined
+   * by a FinalizedVersionRange). Whenever the feature versioning system 
(KIP-584) is
+   * enabled, the finalized features are stored in the cluster-wide common 
FeatureZNode.
+   * In comparison to a supported feature, the key difference is that a 
finalized feature exists
+   * in ZK only when it is guaranteed to be supported by any random broker in 
the cluster for a
+   * specified range of version levels. Also, the controller is the only 
entity modifying the
+   * information about finalized features.
+   *
+   * This method sets up the FeatureZNode with enabled status, which means 
that the finalized
+   * features stored in the FeatureZNode are active. The enabled status should 
be written by the
+   * controller to the FeatureZNode only when the broker IBP config is greater 
than or equal to
+   * KAFKA_2_7_IV0.
+   *
+   * There are multiple cases handled here:
+   *
+   * 1. New cluster bootstrap:
+   *    A new Kafka cluster (i.e. it is deployed first time) is almost always 
started with IBP config
+   *    setting greater than or equal to KAFKA_2_7_IV0. We would like to start 
the cluster with all
+   *    the possible supported features finalized immediately. Assuming this 
is the case, the
+   *    controller will start up and notice that the FeatureZNode is absent in 
the new cluster,
+   *    it will then create a FeatureZNode (with enabled status) containing 
the entire list of
+   *    default supported features as its finalized features.
+   *
+   * 2. Broker binary upgraded, but IBP config set to lower than KAFKA_2_7_IV0:
+   *    Imagine there is an existing Kafka cluster with IBP config less than 
KAFKA_2_7_IV0, and the
+   *    broker binary has been upgraded to a newer version that supports the 
feature versioning
+   *    system (KIP-584). This means the user is upgrading from an earlier 
version of the broker
+   *    binary. In this case, we want to start with no finalized features and 
allow the user to
+   *    finalize them whenever they are ready i.e. in the future whenever the 
user sets IBP config
+   *    to be greater than or equal to KAFKA_2_7_IV0, then the user could 
start finalizing the
+   *    features. This process ensures we do not enable all the possible 
features immediately after
+   *    an upgrade, which could be harmful to Kafka.
+   *    This is how we handle such a case:
+   *      - Before the IBP config upgrade (i.e. IBP config set to less than 
KAFKA_2_7_IV0), the
+   *        controller will start up and check if the FeatureZNode is absent. 
If absent, it will
+   *        react by creating a FeatureZNode with disabled status and empty 
finalized features.
+   *        Otherwise, if a node already exists in enabled status then the 
controller will just
+   *        flip the status to disabled and clear the finalized features.
+   *      - After the IBP config upgrade (i.e. IBP config set to greater than 
or equal to
+   *        KAFKA_2_7_IV0), when the controller starts up it will check if the 
FeatureZNode exists
+   *        and whether it is disabled. In such a case, it won’t upgrade all 
features immediately.
+   *        Instead it will just switch the FeatureZNode status to enabled 
status. This lets the
+   *        user finalize the features later.
+   *
+   * 3. Broker binary upgraded, with existing cluster IBP config >= 
KAFKA_2_7_IV0:
+   *    Imagine an existing Kafka cluster with IBP config >= KAFKA_2_7_IV0, 
and the broker binary
+   *    has just been upgraded to a newer version (that supports IBP config 
KAFKA_2_7_IV0 and higher).
+   *    The controller will start up and find that a FeatureZNode is already 
present with enabled
+   *    status and existing finalized features. In such a case, the controller 
needs to scan the
+   *    existing finalized features and mutate them for the purpose of version 
level deprecation
+   *    (if needed).
+   *    This is how we handle this case: If an existing finalized feature is 
present in the default
+   *    finalized features, then, its existing minimum version level is 
updated to the default

Review comment:
       Could you define the default finalized features? Also, default minimum 
version seems outdated now.

##########
File path: 
clients/src/main/java/org/apache/kafka/common/feature/FinalizedVersionRange.java
##########
@@ -40,14 +40,16 @@ public static FinalizedVersionRange fromMap(Map<String, 
Short> versionRangeMap)
 
     /**
      * Checks if the [min, max] version level range of this object does *NOT* 
fall within the
-     * [min, max] version range of the provided SupportedVersionRange 
parameter.
+     * [min, first_active_version, max] range of the provided 
SupportedVersionRange parameter.

Review comment:
       Should we just verify the range [first_active_version, max]?

##########
File path: core/src/main/scala/kafka/server/BrokerFeatures.scala
##########
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.utils.Logging
+import org.apache.kafka.common.feature.{Features, FinalizedVersionRange, 
SupportedVersionRange}
+import org.apache.kafka.common.feature.Features._
+
+import scala.jdk.CollectionConverters._
+
+/**
+ * A class that encapsulates the latest features supported by the Broker and 
also provides APIs to
+ * check for incompatibilities between the features supported by the Broker 
and finalized features.
+ * The class also enables feature version level deprecation, as explained 
below. This class is
+ * immutable in production. It provides few APIs to mutate state only for the 
purpose of testing.
+ *
+ * Feature version level deprecation:
+ * ==================================
+ *
+ * Deprecation of certain version levels of a feature is a process to stop 
supporting the
+ * functionality offered by the feature at a those version levels, across the 
entire Kafka cluster.
+ * Feature version deprecation is a simple 2-step process explained below. In 
each step below, an
+ * example is provided to help understand the process better:
+ *
+ * STEP 1:
+ * In the first step, a major Kafka release is made with a Broker code change 
(explained later
+ * below) that establishes the intent to deprecate certain versions of one or 
more features
+ * cluster-wide. When this new Kafka release is deployed to the cluster, the 
feature versioning
+ * system (via the controller) will automatically persist the new 
minVersionLevel for the feature in
+ * Zk to propagate the deprecation of certain versions. After this happens, 
any external client that
+ * queries the Broker to learn the feature versions will at some point start 
to see the new value
+ * for the finalized minVersionLevel for the feature. This makes the version 
deprecation permanent.
+ *
+ * Here is how the above code change needs to be done:
+ * In order to deprecate feature version levels, in the supportedFeatures map 
you need to supply a
+ * specific firstActiveVersion value that's higher than the minVersion for the 
feature. The
+ * value for firstActiveVersion should be 1 beyond the highest version that 
you intend to deprecate
+ * for that feature. When features are finalized via the 
ApiKeys.UPDATE_FEATURES api, the feature
+ * version levels in the closed range: [minVersion, firstActiveVersion - 1] 
are automatically
+ * deprecated in ZK by the controller logic.
+ * Example:
+ * - Let us assume the existing finalized feature in ZK:
+ *   {
+ *      "feature_1" -> FinalizedVersionRange(minVersionLevel=1, 
maxVersionLevel=5)
+ *   }
+ *   Now, supposing you would like to deprecate feature version levels: [1, 2].
+ *   Then, in the supportedFeatures map you should supply the following:
+ *   supportedFeatures = {
+ *     "feature1" -> SupportedVersionRange(minVersion=1, firstActiveVersion=3, 
maxVersion=5)
+ *   }
+ * - If you do NOT want to deprecate a version level for a feature, then in 
the supportedFeatures
+ *   map you should supply the firstActiveVersion to be the same as the 
minVersion supplied for that
+ *   feature.
+ *   Example:
+ *   supportedFeatures = {
+ *     "feature1" -> SupportedVersionRange(minVersion=1, firstActiveVersion=1, 
maxVersion=5)
+ *   }
+ *   This indicates no intent to deprecate any version levels for the feature.
+ *
+ * STEP 2:
+ * After the first step is over, you may (at some point) want to permanently 
remove the code/logic
+ * for the functionality offered by the deprecated feature versions. This is 
the second step. Here a
+ * subsequent major Kafka release is made with another Broker code change that 
removes the code for
+ * the functionality offered by the deprecated feature versions. This would 
completely drop support
+ * for the deprecated versions. Such a code change needs to be supplemented by 
supplying a
+ * suitable higher minVersion value for the feature in the supportedFeatures 
map.
+ * Example:
+ * - In the example above in step 1, we showed how to deprecate version levels 
[1, 2] for
+ *   "feature_1". Now let us assume the following finalized feature in ZK 
(after the deprecation
+ *   has been carried out):
+ *   {
+ *     "feature_1" -> FinalizedVersionRange(minVersionLevel=3, 
maxVersionLevel=5)
+ *   }
+ *   Now, supposing you would like to permanently remove support for feature 
versions: [1, 2].
+ *   Then, in the supportedFeatures map you should now supply the following:
+ *   supportedFeatures = {
+ *     "feature1" -> SupportedVersionRange(minVersion=3, firstActiveVersion=3, 
maxVersion=5)

Review comment:
       Is it useful to expose firstActiveVersion to the client?

##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -1656,6 +1893,203 @@ class KafkaController(val config: KafkaConfig,
     }
   }
 
+  /**
+   * Returns the new FinalizedVersionRange for the feature, if there are no 
feature
+   * incompatibilities seen with all known brokers for the provided feature 
update.
+   * Otherwise returns an ApiError object containing Errors.INVALID_REQUEST.
+   *
+   * @param update   the feature update to be processed (this can not be meant 
to delete the feature)
+   *
+   * @return         the new FinalizedVersionRange or error, as described 
above.
+   */
+  private def newFinalizedVersionRangeOrIncompatibilityError(update: 
UpdateFeaturesRequestData.FeatureUpdateKey): Either[FinalizedVersionRange, 
ApiError] = {
+    if (UpdateFeaturesRequest.isDeleteRequest(update)) {
+      throw new IllegalArgumentException(s"Provided feature update can not be 
meant to delete the feature: $update")
+    }
+
+    val supportedVersionRange = 
brokerFeatures.supportedFeatures.get(update.feature)
+    if (supportedVersionRange == null) {
+      Right(new ApiError(Errors.INVALID_REQUEST,
+                         "Could not apply finalized feature update because the 
provided feature" +
+                         " is not supported."))
+    } else {
+      var newVersionRange: FinalizedVersionRange = null
+      try {
+        newVersionRange = new 
FinalizedVersionRange(supportedVersionRange.firstActiveVersion, 
update.maxVersionLevel)
+      } catch {
+        case _: IllegalArgumentException => {
+          // This exception means the provided maxVersionLevel is invalid. It 
is handled below
+          // outside of this catch clause.
+        }
+      }
+      if (newVersionRange == null) {
+        Right(new ApiError(Errors.INVALID_REQUEST,
+          "Could not apply finalized feature update because the provided" +
+          s" maxVersionLevel:${update.maxVersionLevel} is lower than the" +
+          s" first active 
version:${supportedVersionRange.firstActiveVersion}."))
+      } else {
+        val newFinalizedFeature =
+          Features.finalizedFeatures(Utils.mkMap(Utils.mkEntry(update.feature, 
newVersionRange)))
+        val numIncompatibleBrokers = 
controllerContext.liveOrShuttingDownBrokers.count(broker => {
+          BrokerFeatures.hasIncompatibleFeatures(broker.features, 
newFinalizedFeature)
+        })
+        if (numIncompatibleBrokers == 0) {
+          Left(newVersionRange)
+        } else {
+          Right(new ApiError(Errors.INVALID_REQUEST,
+                             "Could not apply finalized feature update 
because" +
+                             " brokers were found to have incompatible 
versions for the feature."))
+        }
+      }
+    }
+  }
+
+  /**
+   * Validates a feature update on an existing FinalizedVersionRange.
+   * If the validation succeeds, then, the return value contains:
+   * 1. the new FinalizedVersionRange for the feature, if the feature update 
was not meant to delete the feature.
+   * 2. Option.empty, if the feature update was meant to delete the feature.
+   *
+   * If the validation fails, then returned value contains a suitable ApiError.
+   *
+   * @param update                 the feature update to be processed.
+   * @param existingVersionRange   the existing FinalizedVersionRange which 
can be empty when no
+   *                               FinalizedVersionRange exists for the 
associated feature
+   *
+   * @return                       the new FinalizedVersionRange to be updated 
into ZK or error
+   *                               as described above.
+   */
+  private def validateFeatureUpdate(update: 
UpdateFeaturesRequestData.FeatureUpdateKey,
+                                   existingVersionRange: 
Option[FinalizedVersionRange]): Either[Option[FinalizedVersionRange], ApiError] 
= {

Review comment:
       indentation

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -112,8 +112,9 @@ class KafkaApis(val requestChannel: RequestChannel,
                 brokerTopicStats: BrokerTopicStats,
                 val clusterId: String,
                 time: Time,
-                val tokenManager: DelegationTokenManager)
-  extends ApiRequestHandler with Logging {
+                val tokenManager: DelegationTokenManager,
+                val brokerFeatures: BrokerFeatures,
+                val featureCache: FinalizedFeatureCache) extends 
ApiRequestHandler with Logging {

Review comment:
       featureCache => finalizedFeatureCache ?

##########
File path: core/src/main/scala/kafka/server/FinalizedFeatureChangeListener.scala
##########
@@ -34,7 +34,7 @@ import scala.concurrent.TimeoutException
  *
  * @param zkClient     the Zookeeper client
  */
-class FinalizedFeatureChangeListener(zkClient: KafkaZkClient) extends Logging {
+class FinalizedFeatureChangeListener(private val featureCache: 
FinalizedFeatureCache, private val zkClient: KafkaZkClient) extends Logging {

Review comment:
       featureCache => finalizedFeatureCache?

##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -272,6 +281,199 @@ class KafkaController(val config: KafkaConfig,
     }
   }
 
+  private def createFeatureZNode(newNode: FeatureZNode): Int = {
+    info(s"Creating FeatureZNode at path: ${FeatureZNode.path} with contents: 
$newNode")
+    zkClient.createFeatureZNode(newNode)
+    val (_, newVersion) = zkClient.getDataAndVersion(FeatureZNode.path)
+    newVersion
+  }
+
+  private def updateFeatureZNode(updatedNode: FeatureZNode): Int = {
+    info(s"Updating FeatureZNode at path: ${FeatureZNode.path} with contents: 
$updatedNode")
+    zkClient.updateFeatureZNode(updatedNode)
+  }
+
+  /**
+   * This method enables the feature versioning system (KIP-584).
+   *
+   * Development in Kafka (from a high level) is organized into features. Each 
feature is tracked by
+   * a name and a range of version numbers. A feature can be of two types:
+   *
+   * 1. Supported feature:
+   * A supported feature is represented by a name (string) and a range of 
versions (defined by a
+   * SupportedVersionRange). It refers to a feature that a particular broker 
advertises
+   * support for. Each broker advertises the version ranges of its own 
supported features in its
+   * own BrokerIdZNode. The contents of the advertisement are specific to the 
particular broker and
+   * do not represent any guarantee of a cluster-wide availability of the 
feature for any particular
+   * range of versions.
+   *
+   * 2. Finalized feature:
+   * A finalized feature is represented by a name (string) and a range of 
version levels (defined
+   * by a FinalizedVersionRange). Whenever the feature versioning system 
(KIP-584) is
+   * enabled, the finalized features are stored in the cluster-wide common 
FeatureZNode.
+   * In comparison to a supported feature, the key difference is that a 
finalized feature exists
+   * in ZK only when it is guaranteed to be supported by any random broker in 
the cluster for a
+   * specified range of version levels. Also, the controller is the only 
entity modifying the
+   * information about finalized features.
+   *
+   * This method sets up the FeatureZNode with enabled status, which means 
that the finalized
+   * features stored in the FeatureZNode are active. The enabled status should 
be written by the
+   * controller to the FeatureZNode only when the broker IBP config is greater 
than or equal to
+   * KAFKA_2_7_IV0.
+   *
+   * There are multiple cases handled here:
+   *
+   * 1. New cluster bootstrap:
+   *    A new Kafka cluster (i.e. it is deployed first time) is almost always 
started with IBP config
+   *    setting greater than or equal to KAFKA_2_7_IV0. We would like to start 
the cluster with all
+   *    the possible supported features finalized immediately. Assuming this 
is the case, the
+   *    controller will start up and notice that the FeatureZNode is absent in 
the new cluster,
+   *    it will then create a FeatureZNode (with enabled status) containing 
the entire list of
+   *    default supported features as its finalized features.
+   *
+   * 2. Broker binary upgraded, but IBP config set to lower than KAFKA_2_7_IV0:
+   *    Imagine there is an existing Kafka cluster with IBP config less than 
KAFKA_2_7_IV0, and the
+   *    broker binary has been upgraded to a newer version that supports the 
feature versioning
+   *    system (KIP-584). This means the user is upgrading from an earlier 
version of the broker
+   *    binary. In this case, we want to start with no finalized features and 
allow the user to
+   *    finalize them whenever they are ready i.e. in the future whenever the 
user sets IBP config
+   *    to be greater than or equal to KAFKA_2_7_IV0, then the user could 
start finalizing the
+   *    features. This process ensures we do not enable all the possible 
features immediately after
+   *    an upgrade, which could be harmful to Kafka.
+   *    This is how we handle such a case:
+   *      - Before the IBP config upgrade (i.e. IBP config set to less than 
KAFKA_2_7_IV0), the
+   *        controller will start up and check if the FeatureZNode is absent. 
If absent, it will
+   *        react by creating a FeatureZNode with disabled status and empty 
finalized features.
+   *        Otherwise, if a node already exists in enabled status then the 
controller will just
+   *        flip the status to disabled and clear the finalized features.
+   *      - After the IBP config upgrade (i.e. IBP config set to greater than 
or equal to
+   *        KAFKA_2_7_IV0), when the controller starts up it will check if the 
FeatureZNode exists
+   *        and whether it is disabled. In such a case, it won’t upgrade all 
features immediately.
+   *        Instead it will just switch the FeatureZNode status to enabled 
status. This lets the
+   *        user finalize the features later.
+   *
+   * 3. Broker binary upgraded, with existing cluster IBP config >= 
KAFKA_2_7_IV0:
+   *    Imagine an existing Kafka cluster with IBP config >= KAFKA_2_7_IV0, 
and the broker binary
+   *    has just been upgraded to a newer version (that supports IBP config 
KAFKA_2_7_IV0 and higher).
+   *    The controller will start up and find that a FeatureZNode is already 
present with enabled
+   *    status and existing finalized features. In such a case, the controller 
needs to scan the
+   *    existing finalized features and mutate them for the purpose of version 
level deprecation
+   *    (if needed).
+   *    This is how we handle this case: If an existing finalized feature is 
present in the default
+   *    finalized features, then, its existing minimum version level is 
updated to the default
+   *    minimum version level maintained in the BrokerFeatures object. The 
goal of this mutation is
+   *    to permanently deprecate one or more feature version levels. The range 
of feature version
+   *    levels deprecated are from the closed range: 
[existing_min_version_level, default_min_version_level].
+   *    NOTE: Deprecating a feature version level is an incompatible change, 
which requires a major
+   *    release of Kafka. In such a release, the minimum version level 
maintained within the
+   *    BrokerFeatures class is updated suitably to record the deprecation of 
the feature.
+   *
+   * 4. Broker downgrade:
+   *    Imagine that a Kafka cluster exists already and the IBP config is 
greater than or equal to
+   *    KAFKA_2_7_IV0. Then, the user decided to downgrade the cluster by 
setting IBP config to a
+   *    value less than KAFKA_2_7_IV0. This means the user is also disabling 
the feature versioning
+   *    system (KIP-584). In this case, when the controller starts up with the 
lower IBP config, it
+   *    will switch the FeatureZNode status to disabled with empty features.
+   */
+  private def enableFeatureVersioning(): Unit = {
+    val defaultFinalizedFeatures = brokerFeatures.defaultFinalizedFeatures
+    val (mayBeFeatureZNodeBytes, version) = 
zkClient.getDataAndVersion(FeatureZNode.path)
+    if (version == ZkVersion.UnknownVersion) {
+      val newVersion = createFeatureZNode(new 
FeatureZNode(FeatureZNodeStatus.Enabled, defaultFinalizedFeatures))
+      featureCache.waitUntilEpochOrThrow(newVersion, 
config.zkConnectionTimeoutMs)

Review comment:
       Since we are including the timeout in the UpdateFeature request, perhaps 
we could just use that timeout here.

##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -1656,6 +1893,203 @@ class KafkaController(val config: KafkaConfig,
     }
   }
 
+  /**
+   * Returns the new FinalizedVersionRange for the feature, if there are no 
feature
+   * incompatibilities seen with all known brokers for the provided feature 
update.
+   * Otherwise returns an ApiError object containing Errors.INVALID_REQUEST.
+   *
+   * @param update   the feature update to be processed (this can not be meant 
to delete the feature)
+   *
+   * @return         the new FinalizedVersionRange or error, as described 
above.
+   */
+  private def newFinalizedVersionRangeOrIncompatibilityError(update: 
UpdateFeaturesRequestData.FeatureUpdateKey): Either[FinalizedVersionRange, 
ApiError] = {
+    if (UpdateFeaturesRequest.isDeleteRequest(update)) {
+      throw new IllegalArgumentException(s"Provided feature update can not be 
meant to delete the feature: $update")
+    }
+
+    val supportedVersionRange = 
brokerFeatures.supportedFeatures.get(update.feature)
+    if (supportedVersionRange == null) {
+      Right(new ApiError(Errors.INVALID_REQUEST,
+                         "Could not apply finalized feature update because the 
provided feature" +
+                         " is not supported."))
+    } else {
+      var newVersionRange: FinalizedVersionRange = null
+      try {
+        newVersionRange = new 
FinalizedVersionRange(supportedVersionRange.firstActiveVersion, 
update.maxVersionLevel)
+      } catch {
+        case _: IllegalArgumentException => {
+          // This exception means the provided maxVersionLevel is invalid. It 
is handled below
+          // outside of this catch clause.
+        }
+      }
+      if (newVersionRange == null) {
+        Right(new ApiError(Errors.INVALID_REQUEST,
+          "Could not apply finalized feature update because the provided" +
+          s" maxVersionLevel:${update.maxVersionLevel} is lower than the" +
+          s" first active 
version:${supportedVersionRange.firstActiveVersion}."))
+      } else {
+        val newFinalizedFeature =
+          Features.finalizedFeatures(Utils.mkMap(Utils.mkEntry(update.feature, 
newVersionRange)))
+        val numIncompatibleBrokers = 
controllerContext.liveOrShuttingDownBrokers.count(broker => {
+          BrokerFeatures.hasIncompatibleFeatures(broker.features, 
newFinalizedFeature)
+        })
+        if (numIncompatibleBrokers == 0) {
+          Left(newVersionRange)
+        } else {
+          Right(new ApiError(Errors.INVALID_REQUEST,
+                             "Could not apply finalized feature update 
because" +
+                             " brokers were found to have incompatible 
versions for the feature."))
+        }
+      }
+    }
+  }
+
+  /**
+   * Validates a feature update on an existing FinalizedVersionRange.
+   * If the validation succeeds, then, the return value contains:
+   * 1. the new FinalizedVersionRange for the feature, if the feature update 
was not meant to delete the feature.
+   * 2. Option.empty, if the feature update was meant to delete the feature.
+   *
+   * If the validation fails, then returned value contains a suitable ApiError.
+   *
+   * @param update                 the feature update to be processed.
+   * @param existingVersionRange   the existing FinalizedVersionRange which 
can be empty when no
+   *                               FinalizedVersionRange exists for the 
associated feature
+   *
+   * @return                       the new FinalizedVersionRange to be updated 
into ZK or error
+   *                               as described above.
+   */
+  private def validateFeatureUpdate(update: 
UpdateFeaturesRequestData.FeatureUpdateKey,
+                                   existingVersionRange: 
Option[FinalizedVersionRange]): Either[Option[FinalizedVersionRange], ApiError] 
= {
+    def newVersionRangeOrError(update: 
UpdateFeaturesRequestData.FeatureUpdateKey): 
Either[Option[FinalizedVersionRange], ApiError] = {
+      newFinalizedVersionRangeOrIncompatibilityError(update)
+        .fold(versionRange => Left(Some(versionRange)), error => Right(error))
+    }
+
+    if (update.feature.isEmpty) {
+      // Check that the feature name is not empty.
+      Right(new ApiError(Errors.INVALID_REQUEST, "Feature name can not be 
empty."))
+    } else {
+      // We handle deletion requests separately from non-deletion requests.
+      if (UpdateFeaturesRequest.isDeleteRequest(update)) {
+        if (existingVersionRange.isEmpty) {
+          // Disallow deletion of a non-existing finalized feature.
+          Right(new ApiError(Errors.INVALID_REQUEST,
+                             "Can not delete non-existing finalized feature."))
+        } else {
+          Left(Option.empty)
+        }
+      } else if (update.maxVersionLevel() < 1) {
+        // Disallow deletion of a finalized feature without allowDowngrade 
flag set.
+        Right(new ApiError(Errors.INVALID_REQUEST,
+                           s"Can not provide maxVersionLevel: 
${update.maxVersionLevel} less" +
+                           s" than 1 without setting the allowDowngrade flag 
to true in the request."))
+      } else {
+        existingVersionRange.map(existing =>
+          if (update.maxVersionLevel == existing.max) {
+            // Disallow a case where target maxVersionLevel matches existing 
maxVersionLevel.
+            Right(new ApiError(Errors.INVALID_REQUEST,
+                               s"Can not ${if (update.allowDowngrade) 
"downgrade" else "upgrade"}" +
+                               s" a finalized feature from existing 
maxVersionLevel:${existing.max}" +
+                               " to the same value."))
+          } else if (update.maxVersionLevel < existing.max && 
!update.allowDowngrade) {
+            // Disallow downgrade of a finalized feature without the 
allowDowngrade flag set.
+            Right(new ApiError(Errors.INVALID_REQUEST,
+                               s"Can not downgrade finalized feature from 
existing" +
+                               s" maxVersionLevel:${existing.max} to provided" 
+
+                               s" maxVersionLevel:${update.maxVersionLevel} 
without setting the" +
+                               " allowDowngrade flag in the request."))
+          } else if (update.allowDowngrade && update.maxVersionLevel > 
existing.max) {
+            // Disallow a request that sets allowDowngrade flag without 
specifying a
+            // maxVersionLevel that's lower than the existing maxVersionLevel.
+            Right(new ApiError(Errors.INVALID_REQUEST,
+                               s"When the allowDowngrade flag set in the 
request, the provided" +
+                               s" maxVersionLevel:${update.maxVersionLevel} 
can not be greater than" +
+                               s" existing maxVersionLevel:${existing.max}."))
+          } else if (update.maxVersionLevel < existing.min) {
+            // Disallow downgrade of a finalized feature below the existing 
finalized
+            // minVersionLevel.
+            Right(new ApiError(Errors.INVALID_REQUEST,
+                               s"Can not downgrade finalized feature to 
maxVersionLevel:${update.maxVersionLevel}" +
+                               s" because it's lower than the existing 
minVersionLevel:${existing.min}."))
+          } else {
+            newVersionRangeOrError(update)
+          }
+        ).getOrElse(newVersionRangeOrError(update))
+      }
+    }
+  }
+
+  private def processFeatureUpdates(request: UpdateFeaturesRequest,
+                                    callback: UpdateFeaturesCallback): Unit = {
+    if (isActive) {
+      processFeatureUpdatesWithActiveController(request, callback)
+    } else {
+      callback(Left(new ApiError(Errors.NOT_CONTROLLER)))
+    }
+  }
+
+  private def processFeatureUpdatesWithActiveController(request: 
UpdateFeaturesRequest,
+                                                        callback: 
UpdateFeaturesCallback): Unit = {
+    val updates = request.data.featureUpdates
+    val existingFeatures = featureCache.get
+      .map(featuresAndEpoch => featuresAndEpoch.features.features().asScala)
+      .getOrElse(Map[String, FinalizedVersionRange]())
+    // A map with key being feature name and value being FinalizedVersionRange.
+    // This contains the target features to be eventually written to 
FeatureZNode.
+    val targetFeatures = scala.collection.mutable.Map[String, 
FinalizedVersionRange]() ++ existingFeatures
+    // A map with key being feature name and value being error encountered 
when the FeatureUpdate
+    // was applied.
+    val errors = scala.collection.mutable.Map[String, ApiError]()
+
+    // Below we process each FeatureUpdate using the following logic:
+    //  - If a FeatureUpdate is found to be valid, then:
+    //    - The corresponding entry in errors map would be updated to contain 
ApiError(Errors.NONE).
+    //    - If the FeatureUpdate is an add or update request, then the 
targetFeatures map is updated
+    //      to contain the new FinalizedVersionRange for the feature.
+    //    - Otherwise if the FeatureUpdate is a delete request, then the 
feature is removed from the
+    //      targetFeatures map.
+    //  - Otherwise if a FeatureUpdate is found to be invalid, then:
+    //    - The corresponding entry in errors map would be updated with the 
appropriate ApiError.
+    //    - The entry in targetFeatures map is left untouched.
+    updates.asScala.iterator.foreach { update =>
+      validateFeatureUpdate(update, existingFeatures.get(update.feature())) 
match {
+        case Left(newVersionRangeOrNone) =>
+          newVersionRangeOrNone
+            .map(newVersionRange => targetFeatures += (update.feature() -> 
newVersionRange))
+            .getOrElse(targetFeatures -= update.feature())
+          errors += (update.feature() -> new ApiError(Errors.NONE))
+        case Right(featureUpdateFailureReason) =>
+          errors += (update.feature() -> featureUpdateFailureReason)
+      }
+    }
+
+    // If the existing and target features are the same, then, we skip the 
update to the
+    // FeatureZNode as no changes to the node are required. Otherwise, we 
replace the contents
+    // of the FeatureZNode with the new features. This may result in partial 
or full modification
+    // of the existing finalized features in ZK.
+    try {
+      if (!existingFeatures.equals(targetFeatures)) {
+        val newNode = new FeatureZNode(FeatureZNodeStatus.Enabled, 
Features.finalizedFeatures(targetFeatures.asJava))
+        val newVersion = zkClient.updateFeatureZNode(newNode)
+        featureCache.waitUntilEpochOrThrow(newVersion, 
config.zkConnectionTimeoutMs)
+      }
+    } catch {
+      // For all features that correspond to valid FeatureUpdate (i.e. error 
is Errors.NONE),
+      // we set the error as Errors.FEATURE_UPDATE_FAILED since the 
FeatureZNode update has failed
+      // for these. For the rest, the existing error is left untouched.
+      case e: Exception =>
+        errors.foreach { case (feature, apiError) =>
+          if (apiError.error() == Errors.NONE) {
+            errors(feature) = new ApiError(Errors.FEATURE_UPDATE_FAILED,
+                                           
Errors.FEATURE_UPDATE_FAILED.message() + " Error: " + e)

Review comment:
       Do we need to return the stacktrace to the caller? Since this is 
unexpected, perhaps we can log a warn?

##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -1656,6 +1893,203 @@ class KafkaController(val config: KafkaConfig,
     }
   }
 
+  /**
+   * Returns the new FinalizedVersionRange for the feature, if there are no 
feature
+   * incompatibilities seen with all known brokers for the provided feature 
update.
+   * Otherwise returns an ApiError object containing Errors.INVALID_REQUEST.
+   *
+   * @param update   the feature update to be processed (this can not be meant 
to delete the feature)
+   *
+   * @return         the new FinalizedVersionRange or error, as described 
above.
+   */
+  private def newFinalizedVersionRangeOrIncompatibilityError(update: 
UpdateFeaturesRequestData.FeatureUpdateKey): Either[FinalizedVersionRange, 
ApiError] = {
+    if (UpdateFeaturesRequest.isDeleteRequest(update)) {
+      throw new IllegalArgumentException(s"Provided feature update can not be 
meant to delete the feature: $update")
+    }
+
+    val supportedVersionRange = 
brokerFeatures.supportedFeatures.get(update.feature)
+    if (supportedVersionRange == null) {
+      Right(new ApiError(Errors.INVALID_REQUEST,
+                         "Could not apply finalized feature update because the 
provided feature" +
+                         " is not supported."))
+    } else {
+      var newVersionRange: FinalizedVersionRange = null
+      try {
+        newVersionRange = new 
FinalizedVersionRange(supportedVersionRange.firstActiveVersion, 
update.maxVersionLevel)
+      } catch {
+        case _: IllegalArgumentException => {
+          // This exception means the provided maxVersionLevel is invalid. It 
is handled below
+          // outside of this catch clause.
+        }
+      }
+      if (newVersionRange == null) {
+        Right(new ApiError(Errors.INVALID_REQUEST,
+          "Could not apply finalized feature update because the provided" +
+          s" maxVersionLevel:${update.maxVersionLevel} is lower than the" +
+          s" first active 
version:${supportedVersionRange.firstActiveVersion}."))
+      } else {
+        val newFinalizedFeature =
+          Features.finalizedFeatures(Utils.mkMap(Utils.mkEntry(update.feature, 
newVersionRange)))
+        val numIncompatibleBrokers = 
controllerContext.liveOrShuttingDownBrokers.count(broker => {
+          BrokerFeatures.hasIncompatibleFeatures(broker.features, 
newFinalizedFeature)
+        })
+        if (numIncompatibleBrokers == 0) {
+          Left(newVersionRange)
+        } else {
+          Right(new ApiError(Errors.INVALID_REQUEST,
+                             "Could not apply finalized feature update 
because" +
+                             " brokers were found to have incompatible 
versions for the feature."))
+        }
+      }
+    }
+  }
+
+  /**
+   * Validates a feature update on an existing FinalizedVersionRange.
+   * If the validation succeeds, then, the return value contains:
+   * 1. the new FinalizedVersionRange for the feature, if the feature update 
was not meant to delete the feature.
+   * 2. Option.empty, if the feature update was meant to delete the feature.
+   *
+   * If the validation fails, then returned value contains a suitable ApiError.
+   *
+   * @param update                 the feature update to be processed.
+   * @param existingVersionRange   the existing FinalizedVersionRange which 
can be empty when no
+   *                               FinalizedVersionRange exists for the 
associated feature
+   *
+   * @return                       the new FinalizedVersionRange to be updated 
into ZK or error
+   *                               as described above.
+   */
+  private def validateFeatureUpdate(update: 
UpdateFeaturesRequestData.FeatureUpdateKey,
+                                   existingVersionRange: 
Option[FinalizedVersionRange]): Either[Option[FinalizedVersionRange], ApiError] 
= {
+    def newVersionRangeOrError(update: 
UpdateFeaturesRequestData.FeatureUpdateKey): 
Either[Option[FinalizedVersionRange], ApiError] = {
+      newFinalizedVersionRangeOrIncompatibilityError(update)
+        .fold(versionRange => Left(Some(versionRange)), error => Right(error))
+    }
+
+    if (update.feature.isEmpty) {
+      // Check that the feature name is not empty.
+      Right(new ApiError(Errors.INVALID_REQUEST, "Feature name can not be 
empty."))
+    } else {
+      // We handle deletion requests separately from non-deletion requests.
+      if (UpdateFeaturesRequest.isDeleteRequest(update)) {
+        if (existingVersionRange.isEmpty) {
+          // Disallow deletion of a non-existing finalized feature.
+          Right(new ApiError(Errors.INVALID_REQUEST,
+                             "Can not delete non-existing finalized feature."))
+        } else {
+          Left(Option.empty)
+        }
+      } else if (update.maxVersionLevel() < 1) {
+        // Disallow deletion of a finalized feature without allowDowngrade 
flag set.
+        Right(new ApiError(Errors.INVALID_REQUEST,
+                           s"Can not provide maxVersionLevel: 
${update.maxVersionLevel} less" +
+                           s" than 1 without setting the allowDowngrade flag 
to true in the request."))
+      } else {
+        existingVersionRange.map(existing =>
+          if (update.maxVersionLevel == existing.max) {
+            // Disallow a case where target maxVersionLevel matches existing 
maxVersionLevel.
+            Right(new ApiError(Errors.INVALID_REQUEST,
+                               s"Can not ${if (update.allowDowngrade) 
"downgrade" else "upgrade"}" +
+                               s" a finalized feature from existing 
maxVersionLevel:${existing.max}" +
+                               " to the same value."))
+          } else if (update.maxVersionLevel < existing.max && 
!update.allowDowngrade) {
+            // Disallow downgrade of a finalized feature without the 
allowDowngrade flag set.
+            Right(new ApiError(Errors.INVALID_REQUEST,
+                               s"Can not downgrade finalized feature from 
existing" +
+                               s" maxVersionLevel:${existing.max} to provided" 
+
+                               s" maxVersionLevel:${update.maxVersionLevel} 
without setting the" +
+                               " allowDowngrade flag in the request."))
+          } else if (update.allowDowngrade && update.maxVersionLevel > 
existing.max) {
+            // Disallow a request that sets allowDowngrade flag without 
specifying a
+            // maxVersionLevel that's lower than the existing maxVersionLevel.
+            Right(new ApiError(Errors.INVALID_REQUEST,
+                               s"When the allowDowngrade flag set in the 
request, the provided" +
+                               s" maxVersionLevel:${update.maxVersionLevel} 
can not be greater than" +
+                               s" existing maxVersionLevel:${existing.max}."))
+          } else if (update.maxVersionLevel < existing.min) {
+            // Disallow downgrade of a finalized feature below the existing 
finalized
+            // minVersionLevel.
+            Right(new ApiError(Errors.INVALID_REQUEST,
+                               s"Can not downgrade finalized feature to 
maxVersionLevel:${update.maxVersionLevel}" +
+                               s" because it's lower than the existing 
minVersionLevel:${existing.min}."))
+          } else {
+            newVersionRangeOrError(update)
+          }
+        ).getOrElse(newVersionRangeOrError(update))
+      }
+    }
+  }
+
+  private def processFeatureUpdates(request: UpdateFeaturesRequest,
+                                    callback: UpdateFeaturesCallback): Unit = {
+    if (isActive) {
+      processFeatureUpdatesWithActiveController(request, callback)
+    } else {
+      callback(Left(new ApiError(Errors.NOT_CONTROLLER)))
+    }
+  }
+
+  private def processFeatureUpdatesWithActiveController(request: 
UpdateFeaturesRequest,
+                                                        callback: 
UpdateFeaturesCallback): Unit = {
+    val updates = request.data.featureUpdates
+    val existingFeatures = featureCache.get
+      .map(featuresAndEpoch => featuresAndEpoch.features.features().asScala)
+      .getOrElse(Map[String, FinalizedVersionRange]())
+    // A map with key being feature name and value being FinalizedVersionRange.
+    // This contains the target features to be eventually written to 
FeatureZNode.
+    val targetFeatures = scala.collection.mutable.Map[String, 
FinalizedVersionRange]() ++ existingFeatures
+    // A map with key being feature name and value being error encountered 
when the FeatureUpdate
+    // was applied.
+    val errors = scala.collection.mutable.Map[String, ApiError]()
+
+    // Below we process each FeatureUpdate using the following logic:
+    //  - If a FeatureUpdate is found to be valid, then:
+    //    - The corresponding entry in errors map would be updated to contain 
ApiError(Errors.NONE).
+    //    - If the FeatureUpdate is an add or update request, then the 
targetFeatures map is updated
+    //      to contain the new FinalizedVersionRange for the feature.
+    //    - Otherwise if the FeatureUpdate is a delete request, then the 
feature is removed from the
+    //      targetFeatures map.
+    //  - Otherwise if a FeatureUpdate is found to be invalid, then:
+    //    - The corresponding entry in errors map would be updated with the 
appropriate ApiError.
+    //    - The entry in targetFeatures map is left untouched.
+    updates.asScala.iterator.foreach { update =>
+      validateFeatureUpdate(update, existingFeatures.get(update.feature())) 
match {
+        case Left(newVersionRangeOrNone) =>
+          newVersionRangeOrNone
+            .map(newVersionRange => targetFeatures += (update.feature() -> 
newVersionRange))

Review comment:
       map() is supposed to be used with no side effect. Perhaps we could use 
match here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to