kowshik commented on a change in pull request #8680:
URL: https://github.com/apache/kafka/pull/8680#discussion_r434344257



##########
File path: 
clients/src/main/java/org/apache/kafka/common/feature/BaseVersionRange.java
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.feature;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Represents an immutable basic version range using 2 attributes: min and 
max, each of type long.
+ * The min and max attributes need to satisfy 2 rules:
+ *  - they are each expected to be >= 1, as we only consider positive version 
values to be valid.
+ *  - max should be >= min.
+ *
+ * The class also provides API to serialize/deserialize the version range 
to/from a map.
+ * The class allows for configurable labels for the min/max attributes, which 
can be specialized by
+ * sub-classes (if needed).
+ */
+class BaseVersionRange {
+    // Non-empty label for the min version key, that's used only for 
serialization/deserialization purposes.
+    private final String minKeyLabel;
+
+    // The value of the minimum version.
+    private final long minValue;

Review comment:
       Done. I have made it `int16` now. Great point.

##########
File path: core/src/main/scala/kafka/zk/ZkData.scala
##########
@@ -22,7 +22,7 @@ import java.util.Properties
 
 import com.fasterxml.jackson.annotation.JsonProperty
 import com.fasterxml.jackson.core.JsonProcessingException
-import kafka.api.{ApiVersion, KAFKA_0_10_0_IV1, LeaderAndIsr}
+import kafka.api.{ApiVersion, KAFKA_0_10_0_IV1, KAFKA_2_6_IV1, LeaderAndIsr}

Review comment:
       Done. Made it KAFKA_2_7_IV0.

##########
File path: 
clients/src/main/java/org/apache/kafka/common/feature/BaseVersionRange.java
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.feature;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Represents an immutable basic version range using 2 attributes: min and 
max, each of type long.
+ * The min and max attributes need to satisfy 2 rules:
+ *  - they are each expected to be >= 1, as we only consider positive version 
values to be valid.
+ *  - max should be >= min.
+ *
+ * The class also provides API to serialize/deserialize the version range 
to/from a map.
+ * The class allows for configurable labels for the min/max attributes, which 
can be specialized by
+ * sub-classes (if needed).
+ */
+class BaseVersionRange {
+    // Non-empty label for the min version key, that's used only for 
serialization/deserialization purposes.
+    private final String minKeyLabel;
+
+    // The value of the minimum version.
+    private final long minValue;
+
+    // Non-empty label for the max version key, that's used only for 
serialization/deserialization purposes.
+    private final String maxKeyLabel;
+
+    // The value of the maximum version.
+    private final long maxValue;
+
+    /**
+     * Raises an exception unless the following condition is met:
+     * minValue >= 1 and maxValue >= 1 and maxValue >= minValue.
+     *
+     * @param minKeyLabel   Label for the min version key, that's used only for
+     *                      serialization/deserialization purposes.
+     * @param minValue      The minimum version value.
+     * @param maxKeyLabel   Label for the max version key, that's used only for
+     *                      serialization/deserialization purposes.
+     * @param maxValue      The maximum version value.
+     *
+     * @throws IllegalArgumentException   If any of the following conditions 
are true:
+     *                                     - (minValue < 1) OR (maxValue < 1) 
OR (maxValue < minValue).
+     *                                     - minKeyLabel is empty, OR, 
minKeyLabel is empty.
+     */
+    protected BaseVersionRange(String minKeyLabel, long minValue, String 
maxKeyLabel, long maxValue) {
+        if (minValue < 1 || maxValue < 1 || maxValue < minValue) {
+            throw new IllegalArgumentException(
+                String.format(
+                    "Expected minValue > 1, maxValue > 1 and maxValue >= 
minValue, but received" +
+                    " minValue: %d, maxValue: %d", minValue, maxValue));
+        }
+        if (minKeyLabel.isEmpty()) {
+            throw new IllegalArgumentException("Expected minKeyLabel to be 
non-empty.");
+        }
+        if (maxKeyLabel.isEmpty()) {
+            throw new IllegalArgumentException("Expected maxKeyLabel to be 
non-empty.");
+        }
+        this.minKeyLabel = minKeyLabel;
+        this.minValue = minValue;
+        this.maxKeyLabel = maxKeyLabel;
+        this.maxValue = maxValue;
+    }
+
+    public long min() {
+        return minValue;
+    }
+
+    public long max() {
+        return maxValue;
+    }
+
+    public String toString() {
+        return String.format("%s[%d, %d]", this.getClass().getSimpleName(), 
min(), max());

Review comment:
       Done.

##########
File path: core/src/main/scala/kafka/cluster/Broker.scala
##########
@@ -54,7 +64,7 @@ case class Broker(id: Int, endPoints: Seq[EndPoint], rack: 
Option[String]) {
     s"$id : ${endPointsMap.values.mkString("(",",",")")} : ${rack.orNull}"
 
   def this(id: Int, host: String, port: Int, listenerName: ListenerName, 
protocol: SecurityProtocol) = {
-    this(id, Seq(EndPoint(host, port, listenerName, protocol)), None)
+    this(id, Seq(EndPoint(host, port, listenerName, protocol)), None, 
emptySupportedFeatures)

Review comment:
       Done. Nice catch!

##########
File path: core/src/main/scala/kafka/server/FinalizedFeatureCache.scala
##########
@@ -0,0 +1,85 @@
+package kafka.server
+
+import kafka.utils.Logging
+import org.apache.kafka.common.feature.{Features, FinalizedVersionRange}
+
+// Raised whenever there was an error in updating the FinalizedFeatureCache 
with features.
+class FeatureCacheUpdateException(message: String) extends 
RuntimeException(message) {
+}
+
+// Helper class that represents finalized features along with an epoch value.
+case class FinalizedFeaturesAndEpoch(features: 
Features[FinalizedVersionRange], epoch: Int) {
+  override def toString(): String = {
+    "FinalizedFeaturesAndEpoch(features=%s, epoch=%s)".format(features, epoch)
+  }
+}
+
+/**
+ * A common mutable cache containing the latest finalized features and epoch. 
By default the contents of
+ * the cache are empty. This cache needs to be populated at least once for 
it's contents to become
+ * non-empty. Currently the main reader of this cache is the read path that 
serves an ApiVersionsRequest,
+ * returning the features information in the response.
+ *
+ * @see FinalizedFeatureChangeListener
+ */
+object FinalizedFeatureCache extends Logging {
+  @volatile private var featuresAndEpoch: Option[FinalizedFeaturesAndEpoch] = 
Option.empty
+
+  /**
+   * @return   the latest known FinalizedFeaturesAndEpoch. If the returned 
value is empty, it means
+   *           no FinalizedFeaturesAndEpoch exists in the cache at the time 
when this
+   *           method is invoked. This result could change in the future 
whenever the
+   *           updateOrThrow method is invoked.
+   */
+  def get: Option[FinalizedFeaturesAndEpoch] = {
+    featuresAndEpoch
+  }
+
+  def empty: Boolean = {

Review comment:
       Done.

##########
File path: core/src/main/scala/kafka/zk/ZkData.scala
##########
@@ -196,6 +219,19 @@ object BrokerIdZNode {
     *   "listener_security_protocol_map":{"CLIENT":"SSL", 
"REPLICATION":"PLAINTEXT"},
     *   "rack":"dc1"
     * }
+    *
+    * Version 5 (current) JSON schema for a broker is:
+    * {
+    *   "version":5,
+    *   "host":"localhost",
+    *   "port":9092,
+    *   "jmx_port":9999,
+    *   "timestamp":"2233345666",
+    *   "endpoints":["CLIENT://host1:9092", "REPLICATION://host1:9093"],
+    *   "listener_security_protocol_map":{"CLIENT":"SSL", 
"REPLICATION":"PLAINTEXT"},

Review comment:
       Done. Removed. Great catch!

##########
File path: 
clients/src/main/java/org/apache/kafka/common/feature/BaseVersionRange.java
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.feature;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Represents an immutable basic version range using 2 attributes: min and 
max, each of type long.
+ * The min and max attributes need to satisfy 2 rules:
+ *  - they are each expected to be >= 1, as we only consider positive version 
values to be valid.
+ *  - max should be >= min.
+ *
+ * The class also provides API to serialize/deserialize the version range 
to/from a map.
+ * The class allows for configurable labels for the min/max attributes, which 
can be specialized by
+ * sub-classes (if needed).
+ */
+class BaseVersionRange {
+    // Non-empty label for the min version key, that's used only for 
serialization/deserialization purposes.
+    private final String minKeyLabel;
+
+    // The value of the minimum version.
+    private final long minValue;
+
+    // Non-empty label for the max version key, that's used only for 
serialization/deserialization purposes.
+    private final String maxKeyLabel;
+
+    // The value of the maximum version.
+    private final long maxValue;
+
+    /**
+     * Raises an exception unless the following condition is met:
+     * minValue >= 1 and maxValue >= 1 and maxValue >= minValue.
+     *
+     * @param minKeyLabel   Label for the min version key, that's used only for
+     *                      serialization/deserialization purposes.
+     * @param minValue      The minimum version value.
+     * @param maxKeyLabel   Label for the max version key, that's used only for
+     *                      serialization/deserialization purposes.
+     * @param maxValue      The maximum version value.
+     *
+     * @throws IllegalArgumentException   If any of the following conditions 
are true:
+     *                                     - (minValue < 1) OR (maxValue < 1) 
OR (maxValue < minValue).
+     *                                     - minKeyLabel is empty, OR, 
minKeyLabel is empty.
+     */
+    protected BaseVersionRange(String minKeyLabel, long minValue, String 
maxKeyLabel, long maxValue) {
+        if (minValue < 1 || maxValue < 1 || maxValue < minValue) {
+            throw new IllegalArgumentException(
+                String.format(
+                    "Expected minValue > 1, maxValue > 1 and maxValue >= 
minValue, but received" +
+                    " minValue: %d, maxValue: %d", minValue, maxValue));
+        }
+        if (minKeyLabel.isEmpty()) {
+            throw new IllegalArgumentException("Expected minKeyLabel to be 
non-empty.");
+        }
+        if (maxKeyLabel.isEmpty()) {
+            throw new IllegalArgumentException("Expected maxKeyLabel to be 
non-empty.");
+        }
+        this.minKeyLabel = minKeyLabel;
+        this.minValue = minValue;
+        this.maxKeyLabel = maxKeyLabel;
+        this.maxValue = maxValue;
+    }
+
+    public long min() {
+        return minValue;
+    }
+
+    public long max() {
+        return maxValue;
+    }
+
+    public String toString() {
+        return String.format("%s[%d, %d]", this.getClass().getSimpleName(), 
min(), max());
+    }
+
+    public Map<String, Long> serialize() {

Review comment:
       Done.

##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
##########
@@ -113,14 +127,44 @@ public static ApiVersionsResponse fromStruct(Struct 
struct, short version) {
         }
     }
 
-    public static ApiVersionsResponse apiVersionsResponse(int throttleTimeMs, 
byte maxMagic) {
+    public static ApiVersionsResponse apiVersionsResponse(
+        int throttleTimeMs,
+        byte maxMagic,
+        Features<SupportedVersionRange> latestSupportedFeatures) {
+        return apiVersionsResponse(
+            throttleTimeMs, maxMagic, latestSupportedFeatures, 
Optional.empty(), Optional.empty());
+    }
+
+    public static ApiVersionsResponse apiVersionsResponse(
+        int throttleTimeMs,
+        byte maxMagic,
+        Features<SupportedVersionRange> latestSupportedFeatures,
+        Features<FinalizedVersionRange> finalizedFeatures,
+        long finalizedFeaturesEpoch) {
+        return apiVersionsResponse(
+            throttleTimeMs, maxMagic, latestSupportedFeatures, 
Optional.of(finalizedFeatures), Optional.of(finalizedFeaturesEpoch));
+    }
+
+    private static ApiVersionsResponse apiVersionsResponse(
+        int throttleTimeMs,
+        byte maxMagic,
+        Features<SupportedVersionRange> latestSupportedFeatures,
+        Optional<Features<FinalizedVersionRange>> finalizedFeatures,
+        Optional<Long> finalizedFeaturesEpoch) {
         if (maxMagic == RecordBatch.CURRENT_MAGIC_VALUE && throttleTimeMs == 
DEFAULT_THROTTLE_TIME) {
             return DEFAULT_API_VERSIONS_RESPONSE;
         }
-        return createApiVersionsResponse(throttleTimeMs, maxMagic);
+        return createApiVersionsResponse(
+            throttleTimeMs, maxMagic, latestSupportedFeatures, 
finalizedFeatures, finalizedFeaturesEpoch);
     }
 
-    public static ApiVersionsResponse createApiVersionsResponse(int 
throttleTimeMs, final byte minMagic) {
+    public static ApiVersionsResponse createApiVersionsResponse(
+        int throttleTimeMs,
+        final byte minMagic,
+        Features<SupportedVersionRange> latestSupportedFeatures,
+        Optional<Features<FinalizedVersionRange>> finalizedFeatures,
+        Optional<Long> finalizedFeaturesEpoch

Review comment:
       I had added such APIs previously. But @abbccdda wanted these removed, as 
they are not currently unused. Please refer to this comment: 
https://github.com/apache/kafka/pull/8680#discussion_r426931875.
   Please, let me know, and I can add them back if you prefer.

##########
File path: core/src/main/scala/kafka/cluster/Broker.scala
##########
@@ -34,14 +36,22 @@ object Broker {
                                          brokerId: Int,
                                          endpoints: util.List[Endpoint],
                                          interBrokerEndpoint: Endpoint) 
extends AuthorizerServerInfo
+
+  def apply(id: Int, endPoints: Seq[EndPoint], rack: Option[String]): Broker = 
{
+    new Broker(id, endPoints, rack, emptySupportedFeatures)
+  }
 }
 
 /**
  * A Kafka broker.
- * A broker has an id, a collection of end-points, an optional rack and a 
listener to security protocol map.
- * Each end-point is (host, port, listenerName).
+ *
+ * @param id          a broker id
+ * @param endPoints   a collection of: end-point and a listener to security 
protocol map.

Review comment:
       Done.

##########
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##########
@@ -442,6 +445,8 @@ object KafkaConfig {
   val ControlledShutdownMaxRetriesProp = "controlled.shutdown.max.retries"
   val ControlledShutdownRetryBackoffMsProp = 
"controlled.shutdown.retry.backoff.ms"
   val ControlledShutdownEnableProp = "controlled.shutdown.enable"
+  /** ********* Features configuration ***********/
+  val FeatureChangeListenerCacheUpdateWaitTimeMsProp = 
"feature.listener.cache.update.wait.ms"

Review comment:
       Done. Great point!

##########
File path: clients/src/main/resources/common/message/ApiVersionsResponse.json
##########
@@ -42,6 +42,33 @@
         "about": "The maximum supported version, inclusive." }
     ]},
     { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", 
"ignorable": true,
-      "about": "The duration in milliseconds for which the request was 
throttled due to a quota violation, or zero if the request did not violate any 
quota." }
+      "about": "The duration in milliseconds for which the request was 
throttled due to a quota violation, or zero if the request did not violate any 
quota." },
+    { "name":  "SupportedFeatures", "type": "[]SupportedFeatureKey",
+      "versions":  "3+", "tag": 0, "taggedVersions": "3+",
+      "about": "Features supported by the broker.",
+      "fields":  [
+        { "name": "Name", "type": "string", "versions": "3+", "mapKey": true,
+          "about": "The name of the feature." },
+        { "name": "MinVersion", "type": "int64", "versions": "3+",
+          "about": "The minimum supported version for the feature." },
+        { "name": "MaxVersion", "type": "int64", "versions": "3+",
+          "about": "The maximum supported version for the feature." }
+      ]
+    },
+    {"name": "FinalizedFeaturesEpoch", "type": "int64", "versions": "3+",

Review comment:
       Done. Changed to `int32` now. Great point!

##########
File path: 
clients/src/main/java/org/apache/kafka/common/feature/BaseVersionRange.java
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.feature;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Represents an immutable basic version range using 2 attributes: min and 
max, each of type long.
+ * The min and max attributes need to satisfy 2 rules:
+ *  - they are each expected to be >= 1, as we only consider positive version 
values to be valid.
+ *  - max should be >= min.
+ *
+ * The class also provides API to serialize/deserialize the version range 
to/from a map.
+ * The class allows for configurable labels for the min/max attributes, which 
can be specialized by
+ * sub-classes (if needed).
+ */
+class BaseVersionRange {
+    // Non-empty label for the min version key, that's used only for 
serialization/deserialization purposes.
+    private final String minKeyLabel;
+
+    // The value of the minimum version.
+    private final long minValue;
+
+    // Non-empty label for the max version key, that's used only for 
serialization/deserialization purposes.
+    private final String maxKeyLabel;
+
+    // The value of the maximum version.
+    private final long maxValue;
+
+    /**
+     * Raises an exception unless the following condition is met:
+     * minValue >= 1 and maxValue >= 1 and maxValue >= minValue.
+     *
+     * @param minKeyLabel   Label for the min version key, that's used only for
+     *                      serialization/deserialization purposes.
+     * @param minValue      The minimum version value.
+     * @param maxKeyLabel   Label for the max version key, that's used only for
+     *                      serialization/deserialization purposes.
+     * @param maxValue      The maximum version value.
+     *
+     * @throws IllegalArgumentException   If any of the following conditions 
are true:
+     *                                     - (minValue < 1) OR (maxValue < 1) 
OR (maxValue < minValue).
+     *                                     - minKeyLabel is empty, OR, 
minKeyLabel is empty.
+     */
+    protected BaseVersionRange(String minKeyLabel, long minValue, String 
maxKeyLabel, long maxValue) {
+        if (minValue < 1 || maxValue < 1 || maxValue < minValue) {
+            throw new IllegalArgumentException(
+                String.format(
+                    "Expected minValue > 1, maxValue > 1 and maxValue >= 
minValue, but received" +

Review comment:
       Done.

##########
File path: core/src/main/scala/kafka/server/FinalizedFeatureCache.scala
##########
@@ -0,0 +1,85 @@
+package kafka.server

Review comment:
       Done.

##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
##########
@@ -113,14 +127,44 @@ public static ApiVersionsResponse fromStruct(Struct 
struct, short version) {
         }
     }
 
-    public static ApiVersionsResponse apiVersionsResponse(int throttleTimeMs, 
byte maxMagic) {
+    public static ApiVersionsResponse apiVersionsResponse(
+        int throttleTimeMs,
+        byte maxMagic,
+        Features<SupportedVersionRange> latestSupportedFeatures) {
+        return apiVersionsResponse(
+            throttleTimeMs, maxMagic, latestSupportedFeatures, 
Optional.empty(), Optional.empty());
+    }
+
+    public static ApiVersionsResponse apiVersionsResponse(
+        int throttleTimeMs,
+        byte maxMagic,
+        Features<SupportedVersionRange> latestSupportedFeatures,
+        Features<FinalizedVersionRange> finalizedFeatures,
+        long finalizedFeaturesEpoch) {
+        return apiVersionsResponse(
+            throttleTimeMs, maxMagic, latestSupportedFeatures, 
Optional.of(finalizedFeatures), Optional.of(finalizedFeaturesEpoch));
+    }
+
+    private static ApiVersionsResponse apiVersionsResponse(
+        int throttleTimeMs,
+        byte maxMagic,
+        Features<SupportedVersionRange> latestSupportedFeatures,
+        Optional<Features<FinalizedVersionRange>> finalizedFeatures,

Review comment:
       It's because non-existing supported features can be represented by an 
empty map (i.e. broker does not advertise any features). But on the other hand, 
non-existing finalized features can not be represented by empty map alone, as 
we need a suitable epoch value that indicates the absence of finalized 
features. To address this case, I saw 2 ways:
   1) Provide a negative epoch value indicating absence of finalized features, 
OR
   2) Represent using an empty `Optional` for both finalized features and epoch.
   
   I chose the latter approach. Please, let me know if you have concerns.

##########
File path: core/src/main/scala/kafka/server/FinalizedFeatureChangeListener.scala
##########
@@ -0,0 +1,219 @@
+package kafka.server
+
+import java.util.concurrent.{CountDownLatch, LinkedBlockingQueue, TimeUnit}
+
+import kafka.utils.{Logging, ShutdownableThread}
+import kafka.zk.{FeatureZNode,FeatureZNodeStatus, KafkaZkClient, ZkVersion}
+import kafka.zookeeper.ZNodeChangeHandler
+import org.apache.kafka.common.internals.FatalExitError
+
+import scala.concurrent.TimeoutException
+
+/**
+ * Listens to changes in the ZK feature node, via the ZK client. Whenever a 
change notification
+ * is received from ZK, the feature cache in FinalizedFeatureCache is 
asynchronously updated
+ * to the latest features read from ZK. The cache updates are serialized 
through a single
+ * notification processor thread.
+ *
+ * @param zkClient     the Zookeeper client
+ */
+class FinalizedFeatureChangeListener(zkClient: KafkaZkClient) extends Logging {
+
+  /**
+   * Helper class used to update the FinalizedFeatureCache.
+   *
+   * @param featureZkNodePath   the path to the ZK feature node to be read
+   * @param maybeNotifyOnce     an optional latch that can be used to notify 
the caller when an
+   *                            updateOrThrow() operation is over
+   */
+  private class FeatureCacheUpdater(featureZkNodePath: String, 
maybeNotifyOnce: Option[CountDownLatch]) {
+
+    def this(featureZkNodePath: String) = this(featureZkNodePath, Option.empty)
+
+    /**
+     * Updates the feature cache in FinalizedFeatureCache with the latest 
features read from the
+     * ZK node in featureZkNodePath. If the cache update is not successful, 
then, a suitable
+     * exception is raised.
+     *
+     * NOTE: if a notifier was provided in the constructor, then, this method 
can be invoked exactly
+     * once successfully. A subsequent invocation will raise an exception.
+     *
+     * @throws   IllegalStateException, if a non-empty notifier was provided 
in the constructor, and
+     *           this method is called again after a successful previous 
invocation.
+     * @throws   FeatureCacheUpdateException, if there was an error in 
updating the
+     *           FinalizedFeatureCache.
+     * @throws   RuntimeException, if there was a failure in 
reading/deserializing the
+     *           contents of the feature ZK node.
+     */
+    def updateLatestOrThrow(): Unit = {
+      maybeNotifyOnce.foreach(notifier => {
+        if (notifier.getCount != 1) {
+          throw new IllegalStateException(
+            "Can not notify after updateLatestOrThrow was called more than 
once successfully.")
+        }
+      })
+
+      debug(s"Reading feature ZK node at path: $featureZkNodePath")
+      val (mayBeFeatureZNodeBytes, version) = 
zkClient.getDataAndVersion(featureZkNodePath)
+
+      // There are 4 cases:
+      //
+      // (empty dataBytes, valid version)       => The empty dataBytes will 
fail FeatureZNode deserialization.
+      //                                           FeatureZNode, when present 
in ZK, can not have empty contents.
+      // (non-empty dataBytes, valid version)   => This is a valid case, and 
should pass FeatureZNode deserialization
+      //                                           if dataBytes contains valid 
data.
+      // (empty dataBytes, unknown version)     => This is a valid case, and 
this can happen if the FeatureZNode
+      //                                           does not exist in ZK.
+      // (non-empty dataBytes, unknown version) => This case is impossible, 
since, KafkaZkClient.getDataAndVersion
+      //                                           API ensures that unknown 
version is returned only when the
+      //                                           ZK node is absent. 
Therefore dataBytes should be empty in such
+      //                                           a case.
+      if (version == ZkVersion.UnknownVersion) {
+        info(s"Feature ZK node at path: $featureZkNodePath does not exist")
+        FinalizedFeatureCache.clear()
+      } else {
+        val featureZNode = FeatureZNode.decode(mayBeFeatureZNodeBytes.get)
+        if (featureZNode.status == FeatureZNodeStatus.Disabled) {
+          info(s"Feature ZK node at path: $featureZkNodePath is in disabled 
status, clearing feature cache.")
+          FinalizedFeatureCache.clear()
+        } else if (featureZNode.status == FeatureZNodeStatus.Enabled) {
+          FinalizedFeatureCache.updateOrThrow(featureZNode.features, version)
+        } else {
+          throw new IllegalStateException(s"Unexpected FeatureZNodeStatus 
found in $featureZNode")
+        }
+      }
+
+      maybeNotifyOnce.foreach(notifier => notifier.countDown())
+    }
+
+    /**
+     * Waits until at least a single updateLatestOrThrow completes 
successfully. This method returns
+     * immediately if an updateLatestOrThrow call had already completed 
successfully.
+     *
+     * @param waitTimeMs   the timeout for the wait operation
+     *
+     * @throws             RuntimeException if the thread was interrupted 
during wait
+     *
+     *                     TimeoutException if the wait can not be completed 
in waitTimeMs
+     *                     milli seconds
+     */
+    def awaitUpdateOrThrow(waitTimeMs: Long): Unit = {
+      maybeNotifyOnce.foreach(notifier => {
+        var success = false
+        try {
+          success = notifier.await(waitTimeMs, TimeUnit.MILLISECONDS)
+        } catch {
+          case e: InterruptedException =>
+            throw new RuntimeException(

Review comment:
       Done. Removed the catch clause and exception wrapping.

##########
File path: core/src/main/scala/kafka/server/FinalizedFeatureChangeListener.scala
##########
@@ -0,0 +1,219 @@
+package kafka.server
+
+import java.util.concurrent.{CountDownLatch, LinkedBlockingQueue, TimeUnit}
+
+import kafka.utils.{Logging, ShutdownableThread}
+import kafka.zk.{FeatureZNode,FeatureZNodeStatus, KafkaZkClient, ZkVersion}
+import kafka.zookeeper.ZNodeChangeHandler
+import org.apache.kafka.common.internals.FatalExitError
+
+import scala.concurrent.TimeoutException
+
+/**
+ * Listens to changes in the ZK feature node, via the ZK client. Whenever a 
change notification
+ * is received from ZK, the feature cache in FinalizedFeatureCache is 
asynchronously updated
+ * to the latest features read from ZK. The cache updates are serialized 
through a single
+ * notification processor thread.
+ *
+ * @param zkClient     the Zookeeper client
+ */
+class FinalizedFeatureChangeListener(zkClient: KafkaZkClient) extends Logging {
+
+  /**
+   * Helper class used to update the FinalizedFeatureCache.
+   *
+   * @param featureZkNodePath   the path to the ZK feature node to be read
+   * @param maybeNotifyOnce     an optional latch that can be used to notify 
the caller when an
+   *                            updateOrThrow() operation is over
+   */
+  private class FeatureCacheUpdater(featureZkNodePath: String, 
maybeNotifyOnce: Option[CountDownLatch]) {
+
+    def this(featureZkNodePath: String) = this(featureZkNodePath, Option.empty)
+
+    /**
+     * Updates the feature cache in FinalizedFeatureCache with the latest 
features read from the
+     * ZK node in featureZkNodePath. If the cache update is not successful, 
then, a suitable
+     * exception is raised.
+     *
+     * NOTE: if a notifier was provided in the constructor, then, this method 
can be invoked exactly
+     * once successfully. A subsequent invocation will raise an exception.
+     *
+     * @throws   IllegalStateException, if a non-empty notifier was provided 
in the constructor, and
+     *           this method is called again after a successful previous 
invocation.
+     * @throws   FeatureCacheUpdateException, if there was an error in 
updating the
+     *           FinalizedFeatureCache.
+     * @throws   RuntimeException, if there was a failure in 
reading/deserializing the
+     *           contents of the feature ZK node.
+     */
+    def updateLatestOrThrow(): Unit = {
+      maybeNotifyOnce.foreach(notifier => {
+        if (notifier.getCount != 1) {
+          throw new IllegalStateException(
+            "Can not notify after updateLatestOrThrow was called more than 
once successfully.")
+        }
+      })
+
+      debug(s"Reading feature ZK node at path: $featureZkNodePath")
+      val (mayBeFeatureZNodeBytes, version) = 
zkClient.getDataAndVersion(featureZkNodePath)
+
+      // There are 4 cases:
+      //
+      // (empty dataBytes, valid version)       => The empty dataBytes will 
fail FeatureZNode deserialization.
+      //                                           FeatureZNode, when present 
in ZK, can not have empty contents.
+      // (non-empty dataBytes, valid version)   => This is a valid case, and 
should pass FeatureZNode deserialization
+      //                                           if dataBytes contains valid 
data.
+      // (empty dataBytes, unknown version)     => This is a valid case, and 
this can happen if the FeatureZNode
+      //                                           does not exist in ZK.
+      // (non-empty dataBytes, unknown version) => This case is impossible, 
since, KafkaZkClient.getDataAndVersion
+      //                                           API ensures that unknown 
version is returned only when the
+      //                                           ZK node is absent. 
Therefore dataBytes should be empty in such
+      //                                           a case.
+      if (version == ZkVersion.UnknownVersion) {
+        info(s"Feature ZK node at path: $featureZkNodePath does not exist")
+        FinalizedFeatureCache.clear()
+      } else {
+        val featureZNode = FeatureZNode.decode(mayBeFeatureZNodeBytes.get)
+        if (featureZNode.status == FeatureZNodeStatus.Disabled) {
+          info(s"Feature ZK node at path: $featureZkNodePath is in disabled 
status, clearing feature cache.")
+          FinalizedFeatureCache.clear()
+        } else if (featureZNode.status == FeatureZNodeStatus.Enabled) {
+          FinalizedFeatureCache.updateOrThrow(featureZNode.features, version)
+        } else {
+          throw new IllegalStateException(s"Unexpected FeatureZNodeStatus 
found in $featureZNode")
+        }
+      }
+
+      maybeNotifyOnce.foreach(notifier => notifier.countDown())
+    }
+
+    /**
+     * Waits until at least a single updateLatestOrThrow completes 
successfully. This method returns
+     * immediately if an updateLatestOrThrow call had already completed 
successfully.
+     *
+     * @param waitTimeMs   the timeout for the wait operation
+     *
+     * @throws             RuntimeException if the thread was interrupted 
during wait
+     *
+     *                     TimeoutException if the wait can not be completed 
in waitTimeMs
+     *                     milli seconds
+     */
+    def awaitUpdateOrThrow(waitTimeMs: Long): Unit = {
+      maybeNotifyOnce.foreach(notifier => {
+        var success = false
+        try {
+          success = notifier.await(waitTimeMs, TimeUnit.MILLISECONDS)
+        } catch {
+          case e: InterruptedException =>
+            throw new RuntimeException(
+              "Unable to wait for FinalizedFeatureCache update to finish.", e)
+        }
+
+        if (!success) {
+          throw new TimeoutException(
+            s"Timed out after waiting for ${waitTimeMs}ms for FeatureCache to 
be updated.")
+        }
+      })
+    }
+  }
+
+  /**
+   * A shutdownable thread to process feature node change notifications that 
are populated into the
+   * queue. If any change notification can not be processed successfully 
(unless it is due to an
+   * interrupt), the thread treats it as a fatal event and triggers Broker 
exit.
+   *
+   * @param name   name of the thread
+   */
+  private class ChangeNotificationProcessorThread(name: String) extends 
ShutdownableThread(name = name) {
+    override def doWork(): Unit = {
+      try {
+        queue.take.updateLatestOrThrow()
+      } catch {
+        case e: InterruptedException => info(s"Change notification queue 
interrupted", e)
+        case e: Exception => {
+          error("Failed to process feature ZK node change event. The broker 
will exit.", e)
+          throw new FatalExitError(1)

Review comment:
       It kills the broker because `ShutdownableThread` catches 
`FatalExitError` and triggers exit sequence: 
https://github.com/apache/kafka/blob/b8d609c207ed3d1e678c2f1eb6f3cae637f92c30/core/src/main/scala/kafka/utils/ShutdownableThread.scala#L98-L102
   
   I have updated the comment to use the word "eventually".
   Regarding logging fatal and continuing -- the exception caught here almost 
always indicates a feature incompatibility, and, that means the broker can 
cause damage if it sticks around. That is why I felt it is better to kill the 
broker in such a rare incompatibility case.
   
   Please, let me know your thoughts.

##########
File path: core/src/main/scala/kafka/server/FinalizedFeatureChangeListener.scala
##########
@@ -0,0 +1,219 @@
+package kafka.server
+
+import java.util.concurrent.{CountDownLatch, LinkedBlockingQueue, TimeUnit}
+
+import kafka.utils.{Logging, ShutdownableThread}
+import kafka.zk.{FeatureZNode,FeatureZNodeStatus, KafkaZkClient, ZkVersion}
+import kafka.zookeeper.ZNodeChangeHandler
+import org.apache.kafka.common.internals.FatalExitError
+
+import scala.concurrent.TimeoutException
+
+/**
+ * Listens to changes in the ZK feature node, via the ZK client. Whenever a 
change notification
+ * is received from ZK, the feature cache in FinalizedFeatureCache is 
asynchronously updated
+ * to the latest features read from ZK. The cache updates are serialized 
through a single
+ * notification processor thread.
+ *
+ * @param zkClient     the Zookeeper client
+ */
+class FinalizedFeatureChangeListener(zkClient: KafkaZkClient) extends Logging {
+
+  /**
+   * Helper class used to update the FinalizedFeatureCache.
+   *
+   * @param featureZkNodePath   the path to the ZK feature node to be read
+   * @param maybeNotifyOnce     an optional latch that can be used to notify 
the caller when an
+   *                            updateOrThrow() operation is over
+   */
+  private class FeatureCacheUpdater(featureZkNodePath: String, 
maybeNotifyOnce: Option[CountDownLatch]) {
+
+    def this(featureZkNodePath: String) = this(featureZkNodePath, Option.empty)
+
+    /**
+     * Updates the feature cache in FinalizedFeatureCache with the latest 
features read from the
+     * ZK node in featureZkNodePath. If the cache update is not successful, 
then, a suitable
+     * exception is raised.
+     *
+     * NOTE: if a notifier was provided in the constructor, then, this method 
can be invoked exactly
+     * once successfully. A subsequent invocation will raise an exception.
+     *
+     * @throws   IllegalStateException, if a non-empty notifier was provided 
in the constructor, and
+     *           this method is called again after a successful previous 
invocation.
+     * @throws   FeatureCacheUpdateException, if there was an error in 
updating the
+     *           FinalizedFeatureCache.
+     * @throws   RuntimeException, if there was a failure in 
reading/deserializing the
+     *           contents of the feature ZK node.
+     */
+    def updateLatestOrThrow(): Unit = {
+      maybeNotifyOnce.foreach(notifier => {
+        if (notifier.getCount != 1) {
+          throw new IllegalStateException(
+            "Can not notify after updateLatestOrThrow was called more than 
once successfully.")
+        }
+      })
+
+      debug(s"Reading feature ZK node at path: $featureZkNodePath")
+      val (mayBeFeatureZNodeBytes, version) = 
zkClient.getDataAndVersion(featureZkNodePath)
+
+      // There are 4 cases:
+      //
+      // (empty dataBytes, valid version)       => The empty dataBytes will 
fail FeatureZNode deserialization.
+      //                                           FeatureZNode, when present 
in ZK, can not have empty contents.
+      // (non-empty dataBytes, valid version)   => This is a valid case, and 
should pass FeatureZNode deserialization
+      //                                           if dataBytes contains valid 
data.
+      // (empty dataBytes, unknown version)     => This is a valid case, and 
this can happen if the FeatureZNode
+      //                                           does not exist in ZK.
+      // (non-empty dataBytes, unknown version) => This case is impossible, 
since, KafkaZkClient.getDataAndVersion
+      //                                           API ensures that unknown 
version is returned only when the
+      //                                           ZK node is absent. 
Therefore dataBytes should be empty in such
+      //                                           a case.
+      if (version == ZkVersion.UnknownVersion) {
+        info(s"Feature ZK node at path: $featureZkNodePath does not exist")
+        FinalizedFeatureCache.clear()
+      } else {
+        val featureZNode = FeatureZNode.decode(mayBeFeatureZNodeBytes.get)
+        if (featureZNode.status == FeatureZNodeStatus.Disabled) {
+          info(s"Feature ZK node at path: $featureZkNodePath is in disabled 
status, clearing feature cache.")
+          FinalizedFeatureCache.clear()
+        } else if (featureZNode.status == FeatureZNodeStatus.Enabled) {
+          FinalizedFeatureCache.updateOrThrow(featureZNode.features, version)
+        } else {
+          throw new IllegalStateException(s"Unexpected FeatureZNodeStatus 
found in $featureZNode")
+        }
+      }
+
+      maybeNotifyOnce.foreach(notifier => notifier.countDown())
+    }
+
+    /**
+     * Waits until at least a single updateLatestOrThrow completes 
successfully. This method returns
+     * immediately if an updateLatestOrThrow call had already completed 
successfully.
+     *
+     * @param waitTimeMs   the timeout for the wait operation
+     *
+     * @throws             RuntimeException if the thread was interrupted 
during wait
+     *
+     *                     TimeoutException if the wait can not be completed 
in waitTimeMs
+     *                     milli seconds
+     */
+    def awaitUpdateOrThrow(waitTimeMs: Long): Unit = {
+      maybeNotifyOnce.foreach(notifier => {
+        var success = false
+        try {
+          success = notifier.await(waitTimeMs, TimeUnit.MILLISECONDS)
+        } catch {
+          case e: InterruptedException =>
+            throw new RuntimeException(
+              "Unable to wait for FinalizedFeatureCache update to finish.", e)
+        }
+
+        if (!success) {
+          throw new TimeoutException(
+            s"Timed out after waiting for ${waitTimeMs}ms for FeatureCache to 
be updated.")
+        }
+      })
+    }
+  }
+
+  /**
+   * A shutdownable thread to process feature node change notifications that 
are populated into the
+   * queue. If any change notification can not be processed successfully 
(unless it is due to an
+   * interrupt), the thread treats it as a fatal event and triggers Broker 
exit.
+   *
+   * @param name   name of the thread
+   */
+  private class ChangeNotificationProcessorThread(name: String) extends 
ShutdownableThread(name = name) {
+    override def doWork(): Unit = {
+      try {
+        queue.take.updateLatestOrThrow()
+      } catch {
+        case e: InterruptedException => info(s"Change notification queue 
interrupted", e)

Review comment:
       Done.

##########
File path: core/src/main/scala/kafka/server/FinalizedFeatureChangeListener.scala
##########
@@ -0,0 +1,219 @@
+package kafka.server
+
+import java.util.concurrent.{CountDownLatch, LinkedBlockingQueue, TimeUnit}
+
+import kafka.utils.{Logging, ShutdownableThread}
+import kafka.zk.{FeatureZNode,FeatureZNodeStatus, KafkaZkClient, ZkVersion}
+import kafka.zookeeper.ZNodeChangeHandler
+import org.apache.kafka.common.internals.FatalExitError
+
+import scala.concurrent.TimeoutException
+
+/**
+ * Listens to changes in the ZK feature node, via the ZK client. Whenever a 
change notification
+ * is received from ZK, the feature cache in FinalizedFeatureCache is 
asynchronously updated
+ * to the latest features read from ZK. The cache updates are serialized 
through a single
+ * notification processor thread.
+ *
+ * @param zkClient     the Zookeeper client
+ */
+class FinalizedFeatureChangeListener(zkClient: KafkaZkClient) extends Logging {
+
+  /**
+   * Helper class used to update the FinalizedFeatureCache.
+   *
+   * @param featureZkNodePath   the path to the ZK feature node to be read
+   * @param maybeNotifyOnce     an optional latch that can be used to notify 
the caller when an
+   *                            updateOrThrow() operation is over
+   */
+  private class FeatureCacheUpdater(featureZkNodePath: String, 
maybeNotifyOnce: Option[CountDownLatch]) {
+
+    def this(featureZkNodePath: String) = this(featureZkNodePath, Option.empty)
+
+    /**
+     * Updates the feature cache in FinalizedFeatureCache with the latest 
features read from the
+     * ZK node in featureZkNodePath. If the cache update is not successful, 
then, a suitable
+     * exception is raised.
+     *
+     * NOTE: if a notifier was provided in the constructor, then, this method 
can be invoked exactly
+     * once successfully. A subsequent invocation will raise an exception.
+     *
+     * @throws   IllegalStateException, if a non-empty notifier was provided 
in the constructor, and
+     *           this method is called again after a successful previous 
invocation.
+     * @throws   FeatureCacheUpdateException, if there was an error in 
updating the
+     *           FinalizedFeatureCache.
+     * @throws   RuntimeException, if there was a failure in 
reading/deserializing the
+     *           contents of the feature ZK node.
+     */
+    def updateLatestOrThrow(): Unit = {
+      maybeNotifyOnce.foreach(notifier => {
+        if (notifier.getCount != 1) {
+          throw new IllegalStateException(
+            "Can not notify after updateLatestOrThrow was called more than 
once successfully.")
+        }
+      })
+
+      debug(s"Reading feature ZK node at path: $featureZkNodePath")
+      val (mayBeFeatureZNodeBytes, version) = 
zkClient.getDataAndVersion(featureZkNodePath)
+
+      // There are 4 cases:
+      //
+      // (empty dataBytes, valid version)       => The empty dataBytes will 
fail FeatureZNode deserialization.
+      //                                           FeatureZNode, when present 
in ZK, can not have empty contents.
+      // (non-empty dataBytes, valid version)   => This is a valid case, and 
should pass FeatureZNode deserialization
+      //                                           if dataBytes contains valid 
data.
+      // (empty dataBytes, unknown version)     => This is a valid case, and 
this can happen if the FeatureZNode
+      //                                           does not exist in ZK.
+      // (non-empty dataBytes, unknown version) => This case is impossible, 
since, KafkaZkClient.getDataAndVersion
+      //                                           API ensures that unknown 
version is returned only when the
+      //                                           ZK node is absent. 
Therefore dataBytes should be empty in such
+      //                                           a case.
+      if (version == ZkVersion.UnknownVersion) {
+        info(s"Feature ZK node at path: $featureZkNodePath does not exist")
+        FinalizedFeatureCache.clear()
+      } else {
+        val featureZNode = FeatureZNode.decode(mayBeFeatureZNodeBytes.get)
+        if (featureZNode.status == FeatureZNodeStatus.Disabled) {
+          info(s"Feature ZK node at path: $featureZkNodePath is in disabled 
status, clearing feature cache.")
+          FinalizedFeatureCache.clear()
+        } else if (featureZNode.status == FeatureZNodeStatus.Enabled) {
+          FinalizedFeatureCache.updateOrThrow(featureZNode.features, version)
+        } else {
+          throw new IllegalStateException(s"Unexpected FeatureZNodeStatus 
found in $featureZNode")
+        }
+      }
+
+      maybeNotifyOnce.foreach(notifier => notifier.countDown())
+    }
+
+    /**
+     * Waits until at least a single updateLatestOrThrow completes 
successfully. This method returns
+     * immediately if an updateLatestOrThrow call had already completed 
successfully.
+     *
+     * @param waitTimeMs   the timeout for the wait operation
+     *
+     * @throws             RuntimeException if the thread was interrupted 
during wait
+     *
+     *                     TimeoutException if the wait can not be completed 
in waitTimeMs
+     *                     milli seconds
+     */
+    def awaitUpdateOrThrow(waitTimeMs: Long): Unit = {
+      maybeNotifyOnce.foreach(notifier => {
+        var success = false
+        try {
+          success = notifier.await(waitTimeMs, TimeUnit.MILLISECONDS)
+        } catch {
+          case e: InterruptedException =>
+            throw new RuntimeException(
+              "Unable to wait for FinalizedFeatureCache update to finish.", e)
+        }
+
+        if (!success) {
+          throw new TimeoutException(
+            s"Timed out after waiting for ${waitTimeMs}ms for FeatureCache to 
be updated.")
+        }
+      })
+    }
+  }
+
+  /**
+   * A shutdownable thread to process feature node change notifications that 
are populated into the
+   * queue. If any change notification can not be processed successfully 
(unless it is due to an
+   * interrupt), the thread treats it as a fatal event and triggers Broker 
exit.
+   *
+   * @param name   name of the thread
+   */
+  private class ChangeNotificationProcessorThread(name: String) extends 
ShutdownableThread(name = name) {
+    override def doWork(): Unit = {
+      try {
+        queue.take.updateLatestOrThrow()
+      } catch {
+        case e: InterruptedException => info(s"Change notification queue 
interrupted", e)
+        case e: Exception => {
+          error("Failed to process feature ZK node change event. The broker 
will exit.", e)
+          throw new FatalExitError(1)
+        }
+      }
+    }
+  }
+
+  // Feature ZK node change handler.
+  object FeatureZNodeChangeHandler extends ZNodeChangeHandler {
+    override val path: String = FeatureZNode.path
+
+    override def handleCreation(): Unit = {
+      info(s"Feature ZK node created at path: $path")
+      queue.add(new FeatureCacheUpdater(path))
+    }
+
+    override def handleDataChange(): Unit = {
+      info(s"Feature ZK node updated at path: $path")
+      queue.add(new FeatureCacheUpdater(path))
+    }
+
+    override def handleDeletion(): Unit = {
+      warn(s"Feature ZK node deleted at path: $path")
+      // This event may happen, rarely (ex: ZK corruption or operational 
error).
+      // In such a case, we prefer to just log a warning and treat the case as 
if the node is absent,
+      // and populate the FinalizedFeatureCache with empty finalized features.
+      queue.add(new FeatureCacheUpdater(path))
+    }
+  }
+
+  private val queue = new LinkedBlockingQueue[FeatureCacheUpdater]
+
+  private val thread = new 
ChangeNotificationProcessorThread("feature-zk-node-event-process-thread")
+
+  /**
+   * This method initializes the feature ZK node change listener. Optionally, 
it also ensures to
+   * update the FinalizedFeatureCache once with the latest contents of the 
feature ZK node
+   * (if the node exists). This step helps ensure that feature 
incompatibilities (if any) in brokers
+   * are conveniently detected before the initOrThrow() method returns to the 
caller. If feature
+   * incompatibilities are detected, this method will throw an Exception to 
the caller, and the Broker
+   * will exit eventually.
+   *
+   * @param waitOnceForCacheUpdateMs   # of milli seconds to wait for feature 
cache to be updated once.
+   *                                   If this parameter <= 0, no wait 
operation happens.
+   *
+   * @throws Exception if feature incompatibility check could not be finished 
in a timely manner
+   */
+  def initOrThrow(waitOnceForCacheUpdateMs: Long): Unit = {
+    thread.start()
+    
zkClient.registerZNodeChangeHandlerAndCheckExistence(FeatureZNodeChangeHandler)
+
+    if (waitOnceForCacheUpdateMs > 0) {

Review comment:
       Done. I have changed the code disallowing values <= 0.

##########
File path: core/src/main/scala/kafka/zk/ZkData.scala
##########
@@ -744,6 +782,161 @@ object DelegationTokenInfoZNode {
   def decode(bytes: Array[Byte]): Option[TokenInformation] = 
DelegationTokenManager.fromBytes(bytes)
 }
 
+/**
+ * Represents the status of the FeatureZNode.
+ *
+ * Enabled  -> This status means the feature versioning system (KIP-584) is 
enabled, and, the
+ *             finalized features stored in the FeatureZNode are active. This 
status is written by
+ *             the controller to the FeatureZNode only when the broker IBP 
config is greater than
+ *             or equal to KAFKA_2_6_IV1.
+ *
+ * Disabled -> This status means the feature versioning system (KIP-584) is 
disabled, and, the
+ *             the finalized features stored in the FeatureZNode is not 
relevant. This status is
+ *             written by the controller to the FeatureZNode only when the 
broker IBP config
+ *             is less than KAFKA_2_6_IV1.
+ *
+ * The purpose behind the FeatureZNodeStatus is that it helps differentiates 
between the following
+ * cases:
+ *
+ * 1. New cluster bootstrap:
+ *    For a new Kafka cluster (i.e. it is deployed first time), we would like 
to start the cluster
+ *    with all the possible supported features finalized immediately. The new 
cluster will almost
+ *    never be started with an old IBP config that’s less than KAFKA_2_6_IV1. 
In such a case, the
+ *    controller will start up and notice that the FeatureZNode is absent in 
the new cluster.
+ *    To handle the requirement, the controller will create a FeatureZNode 
(with enabled status)
+ *    containing the entire list of supported features as its finalized 
features.
+ *
+ * 2. Cluster upgrade:
+ *    Imagine there is an existing Kafka cluster with IBP config less than 
KAFKA_2_6_IV1, but
+ *    the Broker binary has been upgraded to a state where it supports the 
feature versioning
+ *    system (KIP-584). This means the user is upgrading from an earlier 
version of the Broker
+ *    binary. In this case, we want to start with no finalized features and 
allow the user to enable
+ *    them whenever they are ready i.e. in the future whenever the user sets 
IBP config
+ *    to be greater than or equal to KAFKA_2_6_IV1. The reason is that 
enabling all the possible
+ *    features immediately after an upgrade could be harmful to the cluster.
+ *    In such a case:
+ *      - Before the Broker upgrade (i.e. IBP config set to less than 
KAFKA_2_6_IV1), the controller
+ *        will start up and check if the FeatureZNode is absent. If true, then 
it will react by
+ *        creating a FeatureZNode with disabled status and empty features.
+ *      - After the Broker upgrade (i.e. IBP config set to greater than or 
equal to KAFKA_2_6_IV1),
+ *        when the controller starts up it will check if the FeatureZNode 
exists and whether it is
+ *        disabled. In such a case, it won’t upgrade all features immediately. 
Instead it will just
+ *        switch the FeatureZNode status to enabled status. This lets the user 
finalize the features
+ *        later.
+ *
+ * 2. Cluster downgrade:
+ *    Imagine that a Kafka cluster exists already and the IBP config is 
greater than or equal to
+ *    KAFKA_2_6_IV1. Then, the user decided to downgrade the cluster by 
setting IBP config to a
+ *    value less than KAFKA_2_6_IV1. This means the user is also disabling the 
feature versioning
+ *    system (KIP-584). In this case, when the controller starts up with the 
lower IBP config, it
+ *    will switch the FeatureZNode status to disabled with empty features.
+ */
+object FeatureZNodeStatus extends Enumeration {
+  val Disabled, Enabled = Value
+
+  def withNameOpt(value: Int): Option[Value] = {
+    values.find(_.id == value)
+  }
+}
+
+/**
+ * Represents the contents of the ZK node containing finalized feature 
information.
+ *
+ * @param status     the status of the ZK node
+ * @param features   the cluster-wide finalized features
+ */
+case class FeatureZNode(status: FeatureZNodeStatus.Value, features: 
Features[FinalizedVersionRange]) {
+}
+
+object FeatureZNode {
+  private val VersionKey = "version"
+  private val StatusKey = "status"
+  private val FeaturesKey = "features"
+
+  // V0 contains 'version', 'status' and 'features' keys.
+  val V0 = 0
+  val CurrentVersion = V0
+
+  def path = "/feature"
+
+  def asJavaMap(scalaMap: Map[String, Map[String, Long]]): util.Map[String, 
util.Map[String, java.lang.Long]] = {
+    scalaMap
+      .view.mapValues(_.view.mapValues(scalaLong => 
java.lang.Long.valueOf(scalaLong)).toMap.asJava)
+      .toMap
+      .asJava
+  }
+
+  /**
+   * Encodes a FeatureZNode to JSON.
+   *
+   * @param featureZNode   FeatureZNode to be encoded
+   *
+   * @return               JSON representation of the FeatureZNode, as an 
Array[Byte]
+   */
+  def encode(featureZNode: FeatureZNode): Array[Byte] = {
+    val jsonMap = collection.mutable.Map(
+      VersionKey -> CurrentVersion,
+      StatusKey -> featureZNode.status.id,

Review comment:
       Sure. I will be happy to follow up on this. Trying to understand the 
process -- should I update the KIP and send an email as FYI to 
`d...@kafka.apache.org` ?

##########
File path: core/src/main/scala/kafka/server/SupportedFeatures.scala
##########
@@ -0,0 +1,76 @@
+package kafka.server
+
+import kafka.utils.Logging
+import org.apache.kafka.common.feature.{Features, FinalizedVersionRange, 
SupportedVersionRange}
+import org.apache.kafka.common.feature.Features._
+
+import scala.jdk.CollectionConverters._
+
+/**
+ * A common immutable object used in the Broker to define the latest features 
supported by the
+ * Broker. Also provides API to check for incompatibilities between the latest 
features supported
+ * by the Broker and cluster-wide finalized features.
+ *
+ * NOTE: the update() and clear() APIs of this class should be used only for 
testing purposes.
+ */
+object SupportedFeatures extends Logging {
+
+  /**
+   * This is the latest features supported by the Broker.
+   * This is currently empty, but in the future as we define supported 
features, this map should be
+   * populated.
+   */
+  @volatile private var supportedFeatures = emptySupportedFeatures
+
+  /**
+   * Returns a reference to the latest features supported by the Broker.
+   */
+  def get: Features[SupportedVersionRange] = {
+    supportedFeatures
+  }
+
+  // For testing only.
+  def update(newFeatures: Features[SupportedVersionRange]): Unit = {
+    supportedFeatures = newFeatures
+  }
+
+  // For testing only.
+  def clear(): Unit = {
+    supportedFeatures = emptySupportedFeatures
+  }
+
+  /**
+   * Returns the set of feature names found to be 'incompatible'.
+   * A feature incompatibility is a version mismatch between the latest 
feature supported by the
+   * Broker, and the provided finalized feature. This can happen because a 
provided finalized
+   * feature:
+   *  1) Does not exist in the Broker (i.e. it is unknown to the Broker).
+   *           [OR]
+   *  2) Exists but the FinalizedVersionRange does not match with the 
supported feature's SupportedVersionRange.
+   *
+   * @param finalized   The finalized features against which incompatibilities 
need to be checked for.
+   *
+   * @return            The subset of input features which are incompatible. 
If the returned object
+   *                    is empty, it means there were no feature 
incompatibilities found.
+   */
+  def incompatibleFeatures(finalized: Features[FinalizedVersionRange]): 
Features[FinalizedVersionRange] = {
+    val incompatibilities = finalized.features.asScala.collect {
+      case (feature, versionLevels) => {
+        val supportedVersions = supportedFeatures.get(feature)
+        if (supportedVersions == null) {
+          (feature, versionLevels, "{feature=%s, reason='Unsupported 
feature'}".format(feature))
+        } else if (versionLevels.isIncompatibleWith(supportedVersions)) {
+          (feature, versionLevels, "{feature=%s, reason='%s is incompatible 
with %s'}".format(
+            feature, versionLevels, supportedVersions))
+        } else {
+          (feature, versionLevels, null)
+        }
+      }
+    }.filter{ case(_, _, errorReason) => errorReason != null}.toList
+
+    if (incompatibilities.nonEmpty) {
+      warn("Feature incompatibilities seen: " + incompatibilities.map{ case(_, 
_, errorReason) => errorReason })
+    }
+    Features.finalizedFeatures(incompatibilities.map(item => (item._1, 
item._2)).toMap.asJava)

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to