abbccdda commented on a change in pull request #8680:
URL: https://github.com/apache/kafka/pull/8680#discussion_r428370254



##########
File path: 
clients/src/main/java/org/apache/kafka/common/feature/BaseVersionRange.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.feature;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Represents an immutable basic version range using 2 attributes: min and max 
of type long.
+ * The min and max attributes are expected to be >= 1, and with max >= min.
+ *
+ * The class also provides API to serialize/deserialize the version range 
to/from a map.
+ * The class allows for configurable labels for the min/max attributes, which 
can be specialized by
+ * sub-classes (if needed).
+ */
+class BaseVersionRange {

Review comment:
       Do we want to get a unit test class for `BaseVersionRange`?

##########
File path: clients/src/main/java/org/apache/kafka/common/feature/Features.java
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.feature;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.Objects;
+
+import static java.util.stream.Collectors.joining;
+
+/**
+ * Represents an immutable dictionary with key being feature name, and value 
being VersionRangeType.
+ * Also provides API to serialize/deserialize the features and their version 
ranges to/from a map.
+ *
+ * This class can be instantiated only using its factory functions, with the 
important ones being:
+ * Features.supportedFeatures(...) and Features.finalizedFeatures(...).
+ *
+ * @param <VersionRangeType> is the type of version range.
+ * @see SupportedVersionRange
+ * @see FinalizedVersionRange
+ */
+public class Features<VersionRangeType extends BaseVersionRange> {
+    private final Map<String, VersionRangeType> features;
+
+    /**
+     * Constructor is made private, as for readability it is preferred the 
caller uses one of the
+     * static factory functions for instantiation (see below).
+     *
+     * @param features   Map of feature name to type of VersionRange, as the 
backing data structure
+     *                   for the Features object.
+     */
+    private Features(Map<String, VersionRangeType> features) {
+        this.features = features;
+    }
+
+    /**
+     * @param features   Map of feature name to VersionRange, as the backing 
data structure
+     *                   for the Features object.
+     * @return           Returns a new Features object representing 
"supported" features.
+     */
+    public static Features<SupportedVersionRange> 
supportedFeatures(Map<String, SupportedVersionRange> features) {
+        return new Features<>(features);
+    }
+
+    /**
+     * @param features   Map of feature name to FinalizedVersionRange, as the 
backing data structure
+     *                   for the Features object.
+     * @return           Returns a new Features object representing 
"finalized" features.
+     */
+    public static Features<FinalizedVersionRange> 
finalizedFeatures(Map<String, FinalizedVersionRange> features) {
+        return new Features<>(features);
+    }
+
+    // Visible for testing.
+    public static Features<FinalizedVersionRange> emptyFinalizedFeatures() {
+        return new Features<>(new HashMap<>());
+    }
+
+    public static Features<SupportedVersionRange> emptySupportedFeatures() {
+        return new Features<>(new HashMap<>());
+    }
+
+    public Map<String, VersionRangeType> features() {
+        return features;
+    }
+
+    public boolean empty() {
+        return features.isEmpty();
+    }
+
+    /**
+     * @param  feature   name of the feature
+     *
+     * @return           the VersionRangeType corresponding to the feature 
name, or null if absent
+     */
+    public VersionRangeType get(String feature) {
+        return features.get(feature);
+    }
+
+    public String toString() {
+        return String.format(
+            "Features{%s}",
+            features
+                .entrySet()
+                .stream()
+                .map(entry -> String.format("(%s -> %s)", entry.getKey(), 
entry.getValue()))
+                .collect(joining(", "))
+        );
+    }
+
+    /**
+     * @return   A map with underlying features serialized. The returned value 
can be deserialized
+     *           using one of the deserialize* APIs.

Review comment:
       `deserialize()`? I think the second sentence is redundant.

##########
File path: 
clients/src/main/java/org/apache/kafka/common/feature/FinalizedVersionRange.java
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.feature;
+
+import java.util.Map;
+
+/**
+ * A specialization of {@link BaseVersionRange} representing a range of 
version levels.
+ * NOTE: This is the backing class used to define the min/max version levels 
for finalized features.
+ */
+public class FinalizedVersionRange extends BaseVersionRange {
+    // Label for the min version key, that's used only for 
serialization/deserialization purposes.
+    private static final String MIN_VERSION_LEVEL_KEY_LABEL = 
"min_version_level";
+
+    // Label for the max version key, that's used only for 
serialization/deserialization purposes.
+    private static final String MAX_VERSION_LEVEL_KEY_LABEL = 
"max_version_level";
+
+    public FinalizedVersionRange(long minVersionLevel, long maxVersionLevel) {
+        super(MIN_VERSION_LEVEL_KEY_LABEL, minVersionLevel, 
MAX_VERSION_LEVEL_KEY_LABEL, maxVersionLevel);
+    }
+
+    public static FinalizedVersionRange deserialize(Map<String, Long> 
serialized) {
+        return new FinalizedVersionRange(
+            BaseVersionRange.valueOrThrow(MIN_VERSION_LEVEL_KEY_LABEL, 
serialized),
+            BaseVersionRange.valueOrThrow(MAX_VERSION_LEVEL_KEY_LABEL, 
serialized));
+    }
+
+    private boolean isCompatibleWith(BaseVersionRange versionRange) {

Review comment:
       Should be `SupportedVersionRange` 

##########
File path: clients/src/main/java/org/apache/kafka/common/feature/Features.java
##########
@@ -0,0 +1,143 @@
+package org.apache.kafka.common.feature;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.Objects;
+
+import static java.util.stream.Collectors.joining;
+
+/**
+ * Represents an immutable dictionary with key being feature name, and value 
being VersionRangeType.
+ * Also provides API to serialize/deserialize the features and their version 
ranges to/from a map.
+ *
+ * This class can be instantiated only using its factory functions, with the 
important ones being:
+ * Features.supportedFeatures(...) and Features.finalizedFeatures(...).
+ *
+ * @param <VersionRangeType> is the type of version range.
+ */
+public class Features<VersionRangeType extends VersionRange> {
+    private final Map<String, VersionRangeType> features;
+
+    /**
+     * Constructor is made private, as for readability it is preferred the 
caller uses one of the
+     * static factory functions for instantiation (see below).
+     *
+     * @param features   Map of feature name to type of VersionRange, as the 
backing data structure
+     *                   for the Features object.
+     */
+    private Features(Map<String, VersionRangeType> features) {
+        this.features = features;

Review comment:
       Yea, the reasoning is that we have `get` call blindly look up inside 
`features` which in this case null is not valid. And I don't feel passing 
`null` makes sense for the caller, correct?

##########
File path: 
clients/src/main/java/org/apache/kafka/common/feature/FinalizedVersionRange.java
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.feature;
+
+import java.util.Map;
+
+/**
+ * A specialization of {@link BaseVersionRange} representing a range of 
version levels.
+ * NOTE: This is the backing class used to define the min/max version levels 
for finalized features.
+ */
+public class FinalizedVersionRange extends BaseVersionRange {
+    // Label for the min version key, that's used only for 
serialization/deserialization purposes.
+    private static final String MIN_VERSION_LEVEL_KEY_LABEL = 
"min_version_level";
+
+    // Label for the max version key, that's used only for 
serialization/deserialization purposes.
+    private static final String MAX_VERSION_LEVEL_KEY_LABEL = 
"max_version_level";
+
+    public FinalizedVersionRange(long minVersionLevel, long maxVersionLevel) {
+        super(MIN_VERSION_LEVEL_KEY_LABEL, minVersionLevel, 
MAX_VERSION_LEVEL_KEY_LABEL, maxVersionLevel);
+    }
+
+    public static FinalizedVersionRange deserialize(Map<String, Long> 
serialized) {
+        return new FinalizedVersionRange(
+            BaseVersionRange.valueOrThrow(MIN_VERSION_LEVEL_KEY_LABEL, 
serialized),
+            BaseVersionRange.valueOrThrow(MAX_VERSION_LEVEL_KEY_LABEL, 
serialized));
+    }
+
+    private boolean isCompatibleWith(BaseVersionRange versionRange) {
+        return min() >= versionRange.min() && max() <= versionRange.max();

Review comment:
       Just for the sake of argument, I feel we could remove this method and 
just test:
   ```
   min() < supportedVersionRange.min() || max() > supportedVersionRange.max()
   ```
   for incompatibility.

##########
File path: core/src/main/scala/kafka/server/FinalizedFeatureChangeListener.scala
##########
@@ -0,0 +1,203 @@
+package kafka.server
+
+import java.util.concurrent.{CountDownLatch, LinkedBlockingQueue, TimeUnit}
+
+import kafka.utils.{Logging, ShutdownableThread}
+import kafka.zk.{FeatureZNode,FeatureZNodeStatus, KafkaZkClient, ZkVersion}
+import kafka.zookeeper.ZNodeChangeHandler
+import org.apache.kafka.common.internals.FatalExitError
+
+import scala.concurrent.TimeoutException
+
+/**
+ * Listens to changes in the ZK feature node, via the ZK client. Whenever a 
change notification
+ * is received from ZK, the feature cache in FinalizedFeatureCache is 
asynchronously updated
+ * to the latest features read from ZK. The cache updates are serialized 
through a single
+ * notification processor thread.
+ *
+ * @param zkClient     the Zookeeper client
+ */
+class FinalizedFeatureChangeListener(zkClient: KafkaZkClient) extends Logging {
+
+  /**
+   * Helper class used to update the FinalizedFeatureCache.
+   *
+   * @param featureZkNodePath   the path to the ZK feature node to be read
+   * @param maybeNotifyOnce     an optional latch that can be used to propvide 
notification
+   *                            when an update operation is over
+   */
+  private class FeatureCacheUpdater(featureZkNodePath: String, 
maybeNotifyOnce: Option[CountDownLatch]) {
+
+    def this(featureZkNodePath: String) = this(featureZkNodePath, Option.empty)
+
+    /**
+     * Updates the feature cache in FinalizedFeatureCache with the latest 
features read from the
+     * ZK node in featureZkNodePath. If the cache update is not successful, 
then, a suitable
+     * exception is raised.
+     *
+     * NOTE: if a notifier was provided in the constructor, then, this method 
can be invoked
+     * only exactly once successfully.
+     */
+    def updateLatestOrThrow(): Unit = {
+      maybeNotifyOnce.foreach(notifier => {
+        if (notifier.getCount != 1) {
+          throw new IllegalStateException(
+            "Can not notify after updateLatestOrThrow was called more than 
once successfully.")
+        }
+      })
+
+      info(s"Reading feature ZK node at path: $featureZkNodePath")
+      val (mayBeFeatureZNodeBytes, version) = 
zkClient.getDataAndVersion(featureZkNodePath)
+      if (version == ZkVersion.UnknownVersion) {
+        info(s"Feature ZK node at path: $featureZkNodePath does not exist")
+        FinalizedFeatureCache.clear()
+      } else {
+        val featureZNode = FeatureZNode.decode(mayBeFeatureZNodeBytes.get)
+        if (featureZNode.status == FeatureZNodeStatus.Disabled) {
+          info(s"Feature ZK node at path: $featureZkNodePath is in disabled 
status")
+          FinalizedFeatureCache.clear()
+        } else if(featureZNode.status == FeatureZNodeStatus.Enabled) {
+          FinalizedFeatureCache.updateOrThrow(featureZNode.features, version)
+        } else {
+          throw new IllegalStateException(s"Unexpected FeatureZNodeStatus 
found in $featureZNode")
+        }
+      }
+
+      maybeNotifyOnce.foreach(notifier => notifier.countDown())
+    }
+
+    /**
+     * Waits until at least a single updateLatestOrThrow completes 
successfully.
+     * NOTE: The method returns immediately if an updateLatestOrThrow call has 
already completed
+     * successfully.
+     *
+     * @param waitTimeMs   the timeout for the wait operation
+     *
+     * @throws             - RuntimeException if the thread was interrupted 
during wait
+     *                     - TimeoutException if the wait can not be completed 
in waitTimeMs
+     *                       milli seconds
+     */
+    def awaitUpdateOrThrow(waitTimeMs: Long): Unit = {
+      maybeNotifyOnce.foreach(notifier => {
+        var success = false
+        try {
+          success = notifier.await(waitTimeMs, TimeUnit.MILLISECONDS)
+        } catch {
+          case e: InterruptedException =>
+            throw new RuntimeException(
+              "Unable to wait for FinalizedFeatureCache update to finish.", e)
+        }
+
+        if (!success) {
+          throw new TimeoutException(
+            s"Timed out after waiting for ${waitTimeMs}ms for FeatureCache to 
be updated.")
+        }
+      })
+    }
+  }
+
+  /**
+   * A shutdownable thread to process feature node change notifications that 
are populated into the
+   * queue. If any change notification can not be processed successfully 
(unless it is due to an
+   * interrupt), the thread treats it as a fatal event and triggers Broker 
exit.
+   *
+   * @param name   name of the thread
+   */
+  private class ChangeNotificationProcessorThread(name: String) extends 
ShutdownableThread(name = name) {
+    override def doWork(): Unit = {
+      try {
+        queue.take.updateLatestOrThrow()

Review comment:
       Is it possible to have no enqueued updater, and cause this function 
block the thread indefinitely? 

##########
File path: 
clients/src/test/java/org/apache/kafka/common/feature/FinalizedVersionRangeTest.java
##########
@@ -0,0 +1,135 @@
+package org.apache.kafka.common.feature;

Review comment:
       Missing header

##########
File path: 
clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java
##########
@@ -17,13 +17,19 @@
 
 package org.apache.kafka.common.requests;
 
+import org.apache.kafka.common.feature.Features;

Review comment:
       Seems we didn't trigger style check on this new class.

##########
File path: 
clients/src/test/java/org/apache/kafka/common/feature/FinalizedVersionRangeTest.java
##########
@@ -0,0 +1,135 @@
+package org.apache.kafka.common.feature;
+
+import java.util.Map;
+
+import org.junit.Test;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class FinalizedVersionRangeTest {

Review comment:
       What's the difference between this test class and its super class test 
case? Same question for `SupportedVersionRangeTest`

##########
File path: core/src/main/scala/kafka/server/FinalizedFeatureCache.scala
##########
@@ -0,0 +1,93 @@
+package kafka.server
+
+import kafka.utils.Logging
+import org.apache.kafka.common.feature.{Features, VersionLevelRange}
+
+// Raised whenever there was an error in updating the FinalizedFeatureCache 
with features.
+class FeatureCacheUpdateException(message: String) extends 
RuntimeException(message) {
+}
+
+// Helper class that represents finalized features along with an epoch value.
+case class FinalizedFeaturesAndEpoch(features: Features[VersionLevelRange], 
epoch: Int) {
+
+  def isValid(newEpoch: Int): Boolean = {
+    newEpoch >= epoch
+  }
+
+  override def toString(): String = {
+    "FinalizedFeaturesAndEpoch(features=%s, epoch=%s)".format(features, epoch)
+  }
+}
+
+/**
+ * A mutable cache containing the latest finalized features and epoch. This 
cache is populated by a
+ * FinalizedFeatureChangeListener.
+ *
+ * Currently the main reader of this cache is the read path that serves an 
ApiVersionsRequest
+ * returning the features information in the response. In the future, as the 
feature versioning
+ * system in KIP-584 is used more widely, this cache could be read by other 
read paths trying to
+ * learn the finalized feature information.
+ */
+object FinalizedFeatureCache extends Logging {
+  @volatile private var featuresAndEpoch: Option[FinalizedFeaturesAndEpoch] = 
Option.empty
+
+  /**
+   * @return   the latest known FinalizedFeaturesAndEpoch. If the returned 
value is empty, it means
+   *           no FinalizedFeaturesAndEpoch exists in the cache at the time 
when this
+   *           method is invoked. This result could change in the future 
whenever the
+   *           updateOrThrow method is invoked.
+   */
+  def get: Option[FinalizedFeaturesAndEpoch] = {
+    featuresAndEpoch
+  }
+
+  def empty: Boolean = {
+    featuresAndEpoch.isEmpty
+  }
+
+  /**
+   * Clears all existing finalized features and epoch from the cache.
+   */
+  def clear(): Unit = {
+    featuresAndEpoch = Option.empty
+    info("Cleared cache")
+  }
+
+  /**
+   * Updates the cache to the latestFeatures, and updates the existing epoch 
to latestEpoch.
+   * Raises an exception when the operation is not successful.
+   *
+   * @param latestFeatures   the latest finalized features to be set in the 
cache
+   * @param latestEpoch      the latest epoch value to be set in the cache
+   *
+   * @throws                 FeatureCacheUpdateException if the cache update 
operation fails
+   *                         due to invalid parameters or incompatibilities 
with the broker's
+   *                         supported features. In such a case, the existing 
cache contents are
+   *                         not modified.
+   */
+  def updateOrThrow(latestFeatures: Features[VersionLevelRange], latestEpoch: 
Int): Unit = {
+    updateOrThrow(FinalizedFeaturesAndEpoch(latestFeatures, latestEpoch))
+  }
+
+  private def updateOrThrow(latest: FinalizedFeaturesAndEpoch): Unit = {
+    val existingStr = featuresAndEpoch.map(existing => 
existing.toString).getOrElse("<empty>")
+    if (!featuresAndEpoch.isEmpty && featuresAndEpoch.get.epoch > 
latest.epoch) {
+      val errorMsg = ("FinalizedFeatureCache update failed due to invalid 
epoch in new finalized %s." +
+        " The existing finalized is %s").format(latest, existingStr)
+      throw new FeatureCacheUpdateException(errorMsg)
+    } else {
+      val incompatibleFeatures = 
SupportedFeatures.incompatibleFeatures(latest.features)
+      if (incompatibleFeatures.nonEmpty) {
+        val errorMsg = ("FinalizedFeatureCache updated failed since feature 
compatibility" +
+          " checks failed! Supported %s has incompatibilities with the latest 
finalized %s." +
+          " The incompatible features are: %s.").format(
+          SupportedFeatures.get, latest, incompatibleFeatures)
+        throw new FeatureCacheUpdateException(errorMsg)
+      }
+    }
+    val logMsg = "Updated cache from existing finalized %s to latest finalized 
%s".format(

Review comment:
       Could we move this logic as part of inner else? Like:
   ```
   else {
         val incompatibleFeatures = 
SupportedFeatures.incompatibleFeatures(latest.features)
         if (incompatibleFeatures.nonEmpty) {
           val errorMsg = ("FinalizedFeatureCache updated failed since feature 
compatibility" +
             " checks failed! Supported %s has incompatibilities with the 
latest finalized %s." +
             " The incompatible features are: %s.").format(
             SupportedFeatures.get, latest, incompatibleFeatures)
           throw new FeatureCacheUpdateException(errorMsg)
         } else {
           val logMsg = "Updated cache from existing finalized %s to latest 
finalized %s".format(
             oldFeatureAndEpoch, latest)
           featuresAndEpoch = Some(latest)
           info(logMsg)
         }
       }
   ```
   It makes the if-else logic more tight.

##########
File path: core/src/main/scala/kafka/server/FinalizedFeatureChangeListener.scala
##########
@@ -0,0 +1,203 @@
+package kafka.server
+
+import java.util.concurrent.{CountDownLatch, LinkedBlockingQueue, TimeUnit}
+
+import kafka.utils.{Logging, ShutdownableThread}
+import kafka.zk.{FeatureZNode,FeatureZNodeStatus, KafkaZkClient, ZkVersion}
+import kafka.zookeeper.ZNodeChangeHandler
+import org.apache.kafka.common.internals.FatalExitError
+
+import scala.concurrent.TimeoutException
+
+/**
+ * Listens to changes in the ZK feature node, via the ZK client. Whenever a 
change notification
+ * is received from ZK, the feature cache in FinalizedFeatureCache is 
asynchronously updated
+ * to the latest features read from ZK. The cache updates are serialized 
through a single
+ * notification processor thread.
+ *
+ * @param zkClient     the Zookeeper client
+ */
+class FinalizedFeatureChangeListener(zkClient: KafkaZkClient) extends Logging {
+
+  /**
+   * Helper class used to update the FinalizedFeatureCache.
+   *
+   * @param featureZkNodePath   the path to the ZK feature node to be read
+   * @param maybeNotifyOnce     an optional latch that can be used to propvide 
notification
+   *                            when an update operation is over
+   */
+  private class FeatureCacheUpdater(featureZkNodePath: String, 
maybeNotifyOnce: Option[CountDownLatch]) {
+
+    def this(featureZkNodePath: String) = this(featureZkNodePath, Option.empty)
+
+    /**
+     * Updates the feature cache in FinalizedFeatureCache with the latest 
features read from the
+     * ZK node in featureZkNodePath. If the cache update is not successful, 
then, a suitable
+     * exception is raised.
+     *
+     * NOTE: if a notifier was provided in the constructor, then, this method 
can be invoked
+     * only exactly once successfully.
+     */
+    def updateLatestOrThrow(): Unit = {
+      maybeNotifyOnce.foreach(notifier => {
+        if (notifier.getCount != 1) {
+          throw new IllegalStateException(
+            "Can not notify after updateLatestOrThrow was called more than 
once successfully.")
+        }
+      })
+
+      info(s"Reading feature ZK node at path: $featureZkNodePath")
+      val (mayBeFeatureZNodeBytes, version) = 
zkClient.getDataAndVersion(featureZkNodePath)
+      if (version == ZkVersion.UnknownVersion) {
+        info(s"Feature ZK node at path: $featureZkNodePath does not exist")
+        FinalizedFeatureCache.clear()
+      } else {
+        val featureZNode = FeatureZNode.decode(mayBeFeatureZNodeBytes.get)
+        if (featureZNode.status == FeatureZNodeStatus.Disabled) {
+          info(s"Feature ZK node at path: $featureZkNodePath is in disabled 
status")
+          FinalizedFeatureCache.clear()
+        } else if(featureZNode.status == FeatureZNodeStatus.Enabled) {
+          FinalizedFeatureCache.updateOrThrow(featureZNode.features, version)
+        } else {
+          throw new IllegalStateException(s"Unexpected FeatureZNodeStatus 
found in $featureZNode")
+        }
+      }
+
+      maybeNotifyOnce.foreach(notifier => notifier.countDown())
+    }
+
+    /**
+     * Waits until at least a single updateLatestOrThrow completes 
successfully.
+     * NOTE: The method returns immediately if an updateLatestOrThrow call has 
already completed
+     * successfully.
+     *
+     * @param waitTimeMs   the timeout for the wait operation
+     *
+     * @throws             - RuntimeException if the thread was interrupted 
during wait

Review comment:
       I don't think this is scala accepted comment style to add `-`, do you 
see a warning?

##########
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##########
@@ -812,6 +817,8 @@ object KafkaConfig {
   val ControlledShutdownMaxRetriesDoc = "Controlled shutdown can fail for 
multiple reasons. This determines the number of retries when such failure 
happens"
   val ControlledShutdownRetryBackoffMsDoc = "Before each retry, the system 
needs time to recover from the state that caused the previous failure 
(Controller fail over, replica lag etc). This config determines the amount of 
time to wait before retrying."
   val ControlledShutdownEnableDoc = "Enable controlled shutdown of the server"
+  /** ********* Feature configuration ***********/
+  val FeatureChangeListenerCacheUpdateWaitTimeMsDoc = "# of milli seconds to 
wait for feature cache to be updated once."

Review comment:
       `wait time for the first feature cache update upon initialization`

##########
File path: core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
##########
@@ -811,8 +828,16 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
     assertEquals(Seq.empty, zkClient.getSortedBrokerList)
     assertEquals(None, zkClient.getBroker(0))
 
-    val brokerInfo0 = createBrokerInfo(0, "test.host0", 9998, 
SecurityProtocol.PLAINTEXT)
-    val brokerInfo1 = createBrokerInfo(1, "test.host1", 9999, 
SecurityProtocol.SSL)
+    val brokerInfo0 = createBrokerInfo(

Review comment:
       If we are not validating the features by extracting them, I think we do 
not need to pass in a non-empty feature list?

##########
File path: 
clients/src/main/java/org/apache/kafka/common/feature/FinalizedVersionRange.java
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.feature;
+
+import java.util.Map;
+
+/**
+ * A specialization of {@link BaseVersionRange} representing a range of 
version levels.
+ * NOTE: This is the backing class used to define the min/max version levels 
for finalized features.
+ */
+public class FinalizedVersionRange extends BaseVersionRange {
+    // Label for the min version key, that's used only for 
serialization/deserialization purposes.
+    private static final String MIN_VERSION_LEVEL_KEY_LABEL = 
"min_version_level";
+
+    // Label for the max version key, that's used only for 
serialization/deserialization purposes.
+    private static final String MAX_VERSION_LEVEL_KEY_LABEL = 
"max_version_level";
+
+    public FinalizedVersionRange(long minVersionLevel, long maxVersionLevel) {
+        super(MIN_VERSION_LEVEL_KEY_LABEL, minVersionLevel, 
MAX_VERSION_LEVEL_KEY_LABEL, maxVersionLevel);
+    }
+
+    public static FinalizedVersionRange deserialize(Map<String, Long> 
serialized) {
+        return new FinalizedVersionRange(
+            BaseVersionRange.valueOrThrow(MIN_VERSION_LEVEL_KEY_LABEL, 
serialized),
+            BaseVersionRange.valueOrThrow(MAX_VERSION_LEVEL_KEY_LABEL, 
serialized));
+    }
+
+    private boolean isCompatibleWith(BaseVersionRange versionRange) {
+        return min() >= versionRange.min() && max() <= versionRange.max();
+    }
+
+    /**
+     * Checks if the [min, max] version level range of this object does *NOT* 
fall within the
+     * [min, max] version range of the provided SupportedVersionRange 
parameter.
+     *
+     * @param versionRange   the SupportedVersionRange to be checked

Review comment:
       nit: supportedVersionRange

##########
File path: core/src/main/scala/kafka/zk/ZkData.scala
##########
@@ -146,7 +162,14 @@ object BrokerIdZNode {
     val plaintextEndpoint = broker.endPoints.find(_.securityProtocol == 
SecurityProtocol.PLAINTEXT).getOrElse(
       new EndPoint(null, -1, null, null))
     encode(brokerInfo.version, plaintextEndpoint.host, plaintextEndpoint.port, 
broker.endPoints, brokerInfo.jmxPort,
-      broker.rack)
+      broker.rack, broker.features)
+  }
+
+  def asJavaMap(brokerInfo: JsonObject): util.Map[String, util.Map[String, 
java.lang.Long]] = {

Review comment:
       s/asJavaMap/featuresAsJavaMap

##########
File path: core/src/main/scala/kafka/server/SupportedFeatures.scala
##########
@@ -0,0 +1,74 @@
+package kafka.server
+
+import kafka.utils.Logging
+import org.apache.kafka.common.feature.{Features, FinalizedVersionRange, 
SupportedVersionRange}
+import org.apache.kafka.common.feature.Features._
+
+import scala.jdk.CollectionConverters._
+
+/**
+ * A common object used in the Broker to define the latest features supported 
by the Broker.
+ * Also provides API to check for incompatibilities between the latest 
features supported by the
+ * Broker and cluster-wide finalized features.
+ */
+object SupportedFeatures extends Logging {
+
+  /**
+   * This is the latest features supported by the Broker.
+   * This is currently empty, but in the future as we define supported 
features, this map should be
+   * populated.
+   */
+  @volatile private var supportedFeatures = emptySupportedFeatures
+
+  /**
+   * Returns the latest features supported by the Broker.
+   */
+  def get: Features[SupportedVersionRange] = {
+    supportedFeatures
+  }
+
+  // Should be used only for testing.
+  def update(newFeatures: Features[SupportedVersionRange]): Unit = {
+    supportedFeatures = newFeatures
+  }
+
+  // Should be used only for testing.
+  def clear(): Unit = {
+    supportedFeatures = emptySupportedFeatures
+  }
+
+  /**
+   * Returns the set of feature names found to be 'incompatible'.
+   * A feature incompatibility is a version mismatch between the latest 
feature supported by the
+   * Broker, and the provided cluster-wide finalized feature. This can happen 
because a provided
+   * cluster-wide finalized feature:
+   *  1) Does not exist in the Broker (i.e. it is unknown to the Broker).
+   *           [OR]
+   *  2) Exists but the FinalizedVersionRange does not match with the 
supported feature's SupportedVersionRange.
+   *
+   * @param finalized   The finalized features against which incompatibilities 
need to be checked for.
+   *
+   * @return            The set of incompatible feature names. If the returned 
set is empty, it
+   *                    means there were no feature incompatibilities found.
+   */
+  def incompatibleFeatures(finalized: Features[FinalizedVersionRange]): 
Set[String] = {

Review comment:
       I'm slightly inclined to return a set of features instead of just 
strings, and make the string conversion as a helper. But I leave this up to you 
to decide, and we could always adapt the function to make it more useful in 
other scenarios as needed.

##########
File path: core/src/main/scala/kafka/zk/ZkData.scala
##########
@@ -81,17 +83,26 @@ object BrokerIdsZNode {
 object BrokerInfo {
 
   /**
-   * Create a broker info with v4 json format (which includes multiple 
endpoints and rack) if
-   * the apiVersion is 0.10.0.X or above. Register the broker with v2 json 
format otherwise.
+   * - Create a broker info with v5 json format if the apiVersion is 2.6.x or 
above.
+   * - Create a broker info with v4 json format (which includes multiple 
endpoints and rack) if
+   *   the apiVersion is 0.10.0.X or above but lesser than 2.6.x.
+   * - Register the broker with v2 json format otherwise.
    *
    * Due to KAFKA-3100, 0.9.0.0 broker and old clients will break if JSON 
version is above 2.
    *
-   * We include v2 to make it possible for the broker to migrate from 0.9.0.0 
to 0.10.0.X or above without having to
-   * upgrade to 0.9.0.1 first (clients have to be upgraded to 0.9.0.1 in any 
case).
+   * We include v2 to make it possible for the broker to migrate from 0.9.0.0 
to 0.10.0.X or above
+   * without having to upgrade to 0.9.0.1 first (clients have to be upgraded 
to 0.9.0.1 in
+   * any case).
    */
   def apply(broker: Broker, apiVersion: ApiVersion, jmxPort: Int): BrokerInfo 
= {
-    // see method documentation for the reason why we do this
-    val version = if (apiVersion >= KAFKA_0_10_0_IV1) 4 else 2
+    val version = {
+      if (apiVersion >= KAFKA_0_10_0_IV1)

Review comment:
       aha, the order is wrong for `KAFKA_0_10_0_IV1` and `KAFKA_2_6_IV1`

##########
File path: core/src/main/scala/kafka/zk/ZkData.scala
##########
@@ -744,6 +782,90 @@ object DelegationTokenInfoZNode {
   def decode(bytes: Array[Byte]): Option[TokenInformation] = 
DelegationTokenManager.fromBytes(bytes)
 }
 
+object FeatureZNodeStatus extends Enumeration {
+  val Disabled, Enabled = Value
+
+  def withNameOpt(value: Int): Option[Value] = {
+    values.find(_.id == value)
+  }
+}
+
+case class FeatureZNode(status: FeatureZNodeStatus.Value, features: 
Features[FinalizedVersionRange]) {
+}
+
+object FeatureZNode {
+  private val VersionKey = "version"
+  private val StatusKey = "status"
+  private val FeaturesKey = "features"
+
+  // Version0 contains 'version', 'status' and 'features' keys.
+  val Version0 = 0
+  val CurrentVersion = Version0
+
+  def path = "/feature"
+
+  def asJavaMap(scalaMap: Map[String, Map[String, Long]]): util.Map[String, 
util.Map[String, java.lang.Long]] = {
+    scalaMap
+      .view.mapValues(_.view.mapValues(scalaLong => 
java.lang.Long.valueOf(scalaLong)).toMap.asJava)
+      .toMap
+      .asJava
+  }
+
+  def encode(featureZNode: FeatureZNode): Array[Byte] = {
+    val jsonMap = collection.mutable.Map(
+      VersionKey -> CurrentVersion,
+      StatusKey -> featureZNode.status.id,
+      FeaturesKey -> featureZNode.features.serialize)
+    Json.encodeAsBytes(jsonMap.asJava)
+  }
+
+  def decode(jsonBytes: Array[Byte]): FeatureZNode = {
+    Json.tryParseBytes(jsonBytes) match {
+      case Right(js) =>
+        val featureInfo = js.asJsonObject
+        val version = featureInfo(VersionKey).to[Int]
+        if (version < Version0 || version > CurrentVersion) {
+          throw new KafkaException(s"Unsupported version: $version of feature 
information: " +
+            s"${new String(jsonBytes, UTF_8)}")
+        }
+
+        val featuresMap = featureInfo
+          .get(FeaturesKey)
+          .flatMap(_.to[Option[Map[String, Map[String, Long]]]])
+        if (featuresMap.isEmpty) {
+          throw new KafkaException("Features map can not be absent in: " +
+            s"${new String(jsonBytes, UTF_8)}")
+        }
+        val features = asJavaMap(featuresMap.get)
+
+        val statusInt = featureInfo
+          .get(StatusKey)
+          .flatMap(_.to[Option[Int]])
+        if (statusInt.isEmpty) {
+          throw new KafkaException("Status can not be absent in feature 
information: " +
+            s"${new String(jsonBytes, UTF_8)}")
+        }
+        val status = FeatureZNodeStatus.withNameOpt(statusInt.get)
+        if (status.isEmpty) {

Review comment:
       Could we log statusInt here as well? Also I feel the exception should be 
thrown from  `FeatureZNodeStatus.withNameOpt`

##########
File path: clients/src/main/java/org/apache/kafka/common/feature/Features.java
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.feature;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.Objects;
+
+import static java.util.stream.Collectors.joining;
+
+/**
+ * Represents an immutable dictionary with key being feature name, and value 
being VersionRangeType.
+ * Also provides API to serialize/deserialize the features and their version 
ranges to/from a map.
+ *
+ * This class can be instantiated only using its factory functions, with the 
important ones being:
+ * Features.supportedFeatures(...) and Features.finalizedFeatures(...).
+ *
+ * @param <VersionRangeType> is the type of version range.
+ * @see SupportedVersionRange
+ * @see FinalizedVersionRange
+ */
+public class Features<VersionRangeType extends BaseVersionRange> {
+    private final Map<String, VersionRangeType> features;
+
+    /**
+     * Constructor is made private, as for readability it is preferred the 
caller uses one of the
+     * static factory functions for instantiation (see below).
+     *
+     * @param features   Map of feature name to type of VersionRange, as the 
backing data structure
+     *                   for the Features object.
+     */
+    private Features(Map<String, VersionRangeType> features) {
+        this.features = features;
+    }
+
+    /**
+     * @param features   Map of feature name to VersionRange, as the backing 
data structure
+     *                   for the Features object.
+     * @return           Returns a new Features object representing 
"supported" features.
+     */
+    public static Features<SupportedVersionRange> 
supportedFeatures(Map<String, SupportedVersionRange> features) {
+        return new Features<>(features);
+    }
+
+    /**
+     * @param features   Map of feature name to FinalizedVersionRange, as the 
backing data structure
+     *                   for the Features object.
+     * @return           Returns a new Features object representing 
"finalized" features.
+     */
+    public static Features<FinalizedVersionRange> 
finalizedFeatures(Map<String, FinalizedVersionRange> features) {
+        return new Features<>(features);
+    }
+
+    // Visible for testing.
+    public static Features<FinalizedVersionRange> emptyFinalizedFeatures() {
+        return new Features<>(new HashMap<>());
+    }
+
+    public static Features<SupportedVersionRange> emptySupportedFeatures() {
+        return new Features<>(new HashMap<>());
+    }
+
+    public Map<String, VersionRangeType> features() {
+        return features;
+    }
+
+    public boolean empty() {
+        return features.isEmpty();
+    }
+
+    /**
+     * @param  feature   name of the feature
+     *
+     * @return           the VersionRangeType corresponding to the feature 
name, or null if absent
+     */
+    public VersionRangeType get(String feature) {
+        return features.get(feature);
+    }
+
+    public String toString() {
+        return String.format(
+            "Features{%s}",
+            features
+                .entrySet()
+                .stream()
+                .map(entry -> String.format("(%s -> %s)", entry.getKey(), 
entry.getValue()))
+                .collect(joining(", "))
+        );
+    }
+
+    /**
+     * @return   A map with underlying features serialized. The returned value 
can be deserialized
+     *           using one of the deserialize* APIs.
+     */
+    public Map<String, Map<String, Long>> serialize() {
+        return features.entrySet().stream().collect(
+            Collectors.toMap(
+                Map.Entry::getKey,
+                entry -> entry.getValue().serialize()));
+    }
+
+    /**
+     * Deserialize a map to Features<FinalizedVersionRange>.
+     *
+     * @param serialized   the serialized representation of a 
Features<FinalizedVersionRange> object,
+     *                     generated using the serialize() API.
+     *
+     * @return             the deserialized Features<FinalizedVersionRange> 
object
+     */
+    public static Features<FinalizedVersionRange> deserializeFinalizedFeatures(
+        Map<String, Map<String, Long>> serialized) {
+        return finalizedFeatures(serialized.entrySet().stream().collect(
+            Collectors.toMap(
+                Map.Entry::getKey,
+                entry -> 
FinalizedVersionRange.deserialize(entry.getValue()))));
+    }
+
+    /**
+     * Deserializes a map to Features<SupportedVersionRange>.
+     *
+     * @param serialized   the serialized representation of a 
Features<SupportedVersionRange> object,
+     *                     generated using the serialize() API.
+     *
+     * @return             the deserialized Features<SupportedVersionRange> 
object
+     */
+    public static Features<SupportedVersionRange> deserializeSupportedFeatures(

Review comment:
       Maybe I'm a bit too obsessive about code duplication, but after I made 
an attempt I thought we could actually have the internal deserialization logic 
shared between `deserializeFinalizedFeatures` and 
`deserializeSupportedFeatures` by making a template
   ```
    public static Features<FinalizedVersionRange> 
deserializeFinalizedFeatures(Map<String, Map<String, Long>> serialized) {
           return deserializeFeatures(serialized, 
FinalizedVersionRange::deserialize);
       }
   
       public static Features<SupportedVersionRange> 
deserializeSupportedFeatures(
           Map<String, Map<String, Long>> serialized) {
           return deserializeFeatures(serialized, 
SupportedVersionRange::deserialize);
       }
           
       
       private interface Deserializer<V> {
           V deserialize(Map<String, Long> serialized);
       }
   
   
       private static <V extends BaseVersionRange> Features<V> 
deserializeFeatures(Map<String, Map<String, Long>> serialized, Deserializer<V> 
deserializer) {
           return new Features<>(serialized.entrySet().stream().collect(
               Collectors.toMap(
                   Map.Entry::getKey,
                   entry -> deserializer.deserialize(entry.getValue()))));
       }
   ```

##########
File path: core/src/main/scala/kafka/server/FinalizedFeatureCache.scala
##########
@@ -0,0 +1,88 @@
+package kafka.server
+
+import kafka.utils.Logging
+import org.apache.kafka.common.feature.{Features, FinalizedVersionRange}
+
+// Raised whenever there was an error in updating the FinalizedFeatureCache 
with features.
+class FeatureCacheUpdateException(message: String) extends 
RuntimeException(message) {
+}
+
+// Helper class that represents finalized features along with an epoch value.
+case class FinalizedFeaturesAndEpoch(features: 
Features[FinalizedVersionRange], epoch: Int) {
+  override def toString(): String = {
+    "FinalizedFeaturesAndEpoch(features=%s, epoch=%s)".format(features, epoch)
+  }
+}
+
+/**
+ * A mutable cache containing the latest finalized features and epoch. This 
cache is populated by a
+ * {@link FinalizedFeatureChangeListener}.
+ *
+ * Currently the main reader of this cache is the read path that serves an 
ApiVersionsRequest

Review comment:
       I think we don't need to talk about future work inside the comment, just 
making it clear that the read path for serving ApiVersionsRequest is the only 
reader as of now.

##########
File path: 
clients/src/main/java/org/apache/kafka/common/feature/SupportedVersionRange.java
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.feature;
+
+import java.util.Map;
+
+/**
+ * A specialization of VersionRange representing a range of versions.
+ * NOTE: This is the backing class used to define the min/max versions for 
supported features.

Review comment:
       Why this is a `NOTE`? Could we just comment like:
   ```
   An extended BaseVersionRange representing the min/max versions for supported 
features.
   ```

##########
File path: core/src/main/scala/kafka/server/FinalizedFeatureChangeListener.scala
##########
@@ -0,0 +1,203 @@
+package kafka.server
+
+import java.util.concurrent.{CountDownLatch, LinkedBlockingQueue, TimeUnit}
+
+import kafka.utils.{Logging, ShutdownableThread}
+import kafka.zk.{FeatureZNode,FeatureZNodeStatus, KafkaZkClient, ZkVersion}
+import kafka.zookeeper.ZNodeChangeHandler
+import org.apache.kafka.common.internals.FatalExitError
+
+import scala.concurrent.TimeoutException
+
+/**
+ * Listens to changes in the ZK feature node, via the ZK client. Whenever a 
change notification
+ * is received from ZK, the feature cache in FinalizedFeatureCache is 
asynchronously updated
+ * to the latest features read from ZK. The cache updates are serialized 
through a single
+ * notification processor thread.
+ *
+ * @param zkClient     the Zookeeper client
+ */
+class FinalizedFeatureChangeListener(zkClient: KafkaZkClient) extends Logging {
+
+  /**
+   * Helper class used to update the FinalizedFeatureCache.
+   *
+   * @param featureZkNodePath   the path to the ZK feature node to be read
+   * @param maybeNotifyOnce     an optional latch that can be used to propvide 
notification

Review comment:
       nit: provide

##########
File path: core/src/main/scala/kafka/server/FinalizedFeatureChangeListener.scala
##########
@@ -0,0 +1,203 @@
+package kafka.server
+
+import java.util.concurrent.{CountDownLatch, LinkedBlockingQueue, TimeUnit}
+
+import kafka.utils.{Logging, ShutdownableThread}
+import kafka.zk.{FeatureZNode,FeatureZNodeStatus, KafkaZkClient, ZkVersion}
+import kafka.zookeeper.ZNodeChangeHandler
+import org.apache.kafka.common.internals.FatalExitError
+
+import scala.concurrent.TimeoutException
+
+/**
+ * Listens to changes in the ZK feature node, via the ZK client. Whenever a 
change notification
+ * is received from ZK, the feature cache in FinalizedFeatureCache is 
asynchronously updated
+ * to the latest features read from ZK. The cache updates are serialized 
through a single
+ * notification processor thread.
+ *
+ * @param zkClient     the Zookeeper client
+ */
+class FinalizedFeatureChangeListener(zkClient: KafkaZkClient) extends Logging {
+
+  /**
+   * Helper class used to update the FinalizedFeatureCache.
+   *
+   * @param featureZkNodePath   the path to the ZK feature node to be read
+   * @param maybeNotifyOnce     an optional latch that can be used to propvide 
notification
+   *                            when an update operation is over
+   */
+  private class FeatureCacheUpdater(featureZkNodePath: String, 
maybeNotifyOnce: Option[CountDownLatch]) {
+
+    def this(featureZkNodePath: String) = this(featureZkNodePath, Option.empty)
+
+    /**
+     * Updates the feature cache in FinalizedFeatureCache with the latest 
features read from the
+     * ZK node in featureZkNodePath. If the cache update is not successful, 
then, a suitable
+     * exception is raised.
+     *
+     * NOTE: if a notifier was provided in the constructor, then, this method 
can be invoked
+     * only exactly once successfully.
+     */
+    def updateLatestOrThrow(): Unit = {
+      maybeNotifyOnce.foreach(notifier => {
+        if (notifier.getCount != 1) {
+          throw new IllegalStateException(
+            "Can not notify after updateLatestOrThrow was called more than 
once successfully.")
+        }
+      })
+
+      info(s"Reading feature ZK node at path: $featureZkNodePath")
+      val (mayBeFeatureZNodeBytes, version) = 
zkClient.getDataAndVersion(featureZkNodePath)
+      if (version == ZkVersion.UnknownVersion) {

Review comment:
       Does the version field existence guarantee there is a valid feature data 
node or not? In fact, `getDataAndVersion` returns an optional data. I checked 
the getDataAndVersion caller `ProducerIdManager`, there is a handling for empty 
data which I feel we should have as well. 
   Additionally, I think since we haven't implemented the write path yet, could 
we get a ticket to write down a short description on how the write path shall 
look like, by defining the different cases like:
   ```
   empty dataBytes, valid version 
   valid dataBytes, valid version 
   empty dataBytes, unknown version 
   valid dataBytes, unknown version 
   ```
   if that makes sense, so that we could keep track of the design decisions we 
made in the read path PR when implementing the write path.

##########
File path: core/src/main/scala/kafka/server/FinalizedFeatureChangeListener.scala
##########
@@ -0,0 +1,203 @@
+package kafka.server
+
+import java.util.concurrent.{CountDownLatch, LinkedBlockingQueue, TimeUnit}
+
+import kafka.utils.{Logging, ShutdownableThread}
+import kafka.zk.{FeatureZNode,FeatureZNodeStatus, KafkaZkClient, ZkVersion}
+import kafka.zookeeper.ZNodeChangeHandler
+import org.apache.kafka.common.internals.FatalExitError
+
+import scala.concurrent.TimeoutException
+
+/**
+ * Listens to changes in the ZK feature node, via the ZK client. Whenever a 
change notification
+ * is received from ZK, the feature cache in FinalizedFeatureCache is 
asynchronously updated
+ * to the latest features read from ZK. The cache updates are serialized 
through a single
+ * notification processor thread.
+ *
+ * @param zkClient     the Zookeeper client
+ */
+class FinalizedFeatureChangeListener(zkClient: KafkaZkClient) extends Logging {
+
+  /**
+   * Helper class used to update the FinalizedFeatureCache.
+   *
+   * @param featureZkNodePath   the path to the ZK feature node to be read
+   * @param maybeNotifyOnce     an optional latch that can be used to propvide 
notification
+   *                            when an update operation is over
+   */
+  private class FeatureCacheUpdater(featureZkNodePath: String, 
maybeNotifyOnce: Option[CountDownLatch]) {
+
+    def this(featureZkNodePath: String) = this(featureZkNodePath, Option.empty)
+
+    /**
+     * Updates the feature cache in FinalizedFeatureCache with the latest 
features read from the
+     * ZK node in featureZkNodePath. If the cache update is not successful, 
then, a suitable
+     * exception is raised.
+     *
+     * NOTE: if a notifier was provided in the constructor, then, this method 
can be invoked
+     * only exactly once successfully.
+     */
+    def updateLatestOrThrow(): Unit = {
+      maybeNotifyOnce.foreach(notifier => {
+        if (notifier.getCount != 1) {
+          throw new IllegalStateException(
+            "Can not notify after updateLatestOrThrow was called more than 
once successfully.")
+        }
+      })
+
+      info(s"Reading feature ZK node at path: $featureZkNodePath")
+      val (mayBeFeatureZNodeBytes, version) = 
zkClient.getDataAndVersion(featureZkNodePath)
+      if (version == ZkVersion.UnknownVersion) {
+        info(s"Feature ZK node at path: $featureZkNodePath does not exist")
+        FinalizedFeatureCache.clear()
+      } else {
+        val featureZNode = FeatureZNode.decode(mayBeFeatureZNodeBytes.get)
+        if (featureZNode.status == FeatureZNodeStatus.Disabled) {
+          info(s"Feature ZK node at path: $featureZkNodePath is in disabled 
status")
+          FinalizedFeatureCache.clear()
+        } else if(featureZNode.status == FeatureZNodeStatus.Enabled) {
+          FinalizedFeatureCache.updateOrThrow(featureZNode.features, version)
+        } else {
+          throw new IllegalStateException(s"Unexpected FeatureZNodeStatus 
found in $featureZNode")
+        }
+      }
+
+      maybeNotifyOnce.foreach(notifier => notifier.countDown())
+    }
+
+    /**
+     * Waits until at least a single updateLatestOrThrow completes 
successfully.
+     * NOTE: The method returns immediately if an updateLatestOrThrow call has 
already completed
+     * successfully.
+     *
+     * @param waitTimeMs   the timeout for the wait operation
+     *
+     * @throws             - RuntimeException if the thread was interrupted 
during wait
+     *                     - TimeoutException if the wait can not be completed 
in waitTimeMs
+     *                       milli seconds
+     */
+    def awaitUpdateOrThrow(waitTimeMs: Long): Unit = {
+      maybeNotifyOnce.foreach(notifier => {
+        var success = false
+        try {
+          success = notifier.await(waitTimeMs, TimeUnit.MILLISECONDS)
+        } catch {
+          case e: InterruptedException =>
+            throw new RuntimeException(
+              "Unable to wait for FinalizedFeatureCache update to finish.", e)
+        }
+
+        if (!success) {
+          throw new TimeoutException(
+            s"Timed out after waiting for ${waitTimeMs}ms for FeatureCache to 
be updated.")
+        }
+      })
+    }
+  }
+
+  /**
+   * A shutdownable thread to process feature node change notifications that 
are populated into the
+   * queue. If any change notification can not be processed successfully 
(unless it is due to an
+   * interrupt), the thread treats it as a fatal event and triggers Broker 
exit.
+   *
+   * @param name   name of the thread
+   */
+  private class ChangeNotificationProcessorThread(name: String) extends 
ShutdownableThread(name = name) {
+    override def doWork(): Unit = {
+      try {
+        queue.take.updateLatestOrThrow()
+      } catch {
+        case e: InterruptedException => info(s"Interrupted", e)
+        case e: Exception => {
+          error("Failed to process feature ZK node change event. The broker 
will exit.", e)
+          throw new FatalExitError(1)
+        }
+      }
+    }
+  }
+
+  // Feature ZK node change handler.
+  object FeatureZNodeChangeHandler extends ZNodeChangeHandler {
+    override val path: String = FeatureZNode.path
+
+    private def processNotification(): Unit = {
+      queue.add(new FeatureCacheUpdater(path))
+    }
+
+    override def handleCreation(): Unit = {
+      info(s"Feature ZK node created at path: $path")

Review comment:
       Do we need the comment to be on info level?

##########
File path: core/src/main/scala/kafka/server/FinalizedFeatureChangeListener.scala
##########
@@ -0,0 +1,203 @@
+package kafka.server
+
+import java.util.concurrent.{CountDownLatch, LinkedBlockingQueue, TimeUnit}
+
+import kafka.utils.{Logging, ShutdownableThread}
+import kafka.zk.{FeatureZNode,FeatureZNodeStatus, KafkaZkClient, ZkVersion}
+import kafka.zookeeper.ZNodeChangeHandler
+import org.apache.kafka.common.internals.FatalExitError
+
+import scala.concurrent.TimeoutException
+
+/**
+ * Listens to changes in the ZK feature node, via the ZK client. Whenever a 
change notification
+ * is received from ZK, the feature cache in FinalizedFeatureCache is 
asynchronously updated
+ * to the latest features read from ZK. The cache updates are serialized 
through a single
+ * notification processor thread.
+ *
+ * @param zkClient     the Zookeeper client
+ */
+class FinalizedFeatureChangeListener(zkClient: KafkaZkClient) extends Logging {
+
+  /**
+   * Helper class used to update the FinalizedFeatureCache.
+   *
+   * @param featureZkNodePath   the path to the ZK feature node to be read
+   * @param maybeNotifyOnce     an optional latch that can be used to propvide 
notification
+   *                            when an update operation is over
+   */
+  private class FeatureCacheUpdater(featureZkNodePath: String, 
maybeNotifyOnce: Option[CountDownLatch]) {
+
+    def this(featureZkNodePath: String) = this(featureZkNodePath, Option.empty)
+
+    /**
+     * Updates the feature cache in FinalizedFeatureCache with the latest 
features read from the
+     * ZK node in featureZkNodePath. If the cache update is not successful, 
then, a suitable
+     * exception is raised.
+     *
+     * NOTE: if a notifier was provided in the constructor, then, this method 
can be invoked
+     * only exactly once successfully.
+     */
+    def updateLatestOrThrow(): Unit = {
+      maybeNotifyOnce.foreach(notifier => {
+        if (notifier.getCount != 1) {
+          throw new IllegalStateException(
+            "Can not notify after updateLatestOrThrow was called more than 
once successfully.")
+        }
+      })
+
+      info(s"Reading feature ZK node at path: $featureZkNodePath")
+      val (mayBeFeatureZNodeBytes, version) = 
zkClient.getDataAndVersion(featureZkNodePath)
+      if (version == ZkVersion.UnknownVersion) {
+        info(s"Feature ZK node at path: $featureZkNodePath does not exist")
+        FinalizedFeatureCache.clear()
+      } else {
+        val featureZNode = FeatureZNode.decode(mayBeFeatureZNodeBytes.get)
+        if (featureZNode.status == FeatureZNodeStatus.Disabled) {
+          info(s"Feature ZK node at path: $featureZkNodePath is in disabled 
status")
+          FinalizedFeatureCache.clear()
+        } else if(featureZNode.status == FeatureZNodeStatus.Enabled) {
+          FinalizedFeatureCache.updateOrThrow(featureZNode.features, version)
+        } else {
+          throw new IllegalStateException(s"Unexpected FeatureZNodeStatus 
found in $featureZNode")
+        }
+      }
+
+      maybeNotifyOnce.foreach(notifier => notifier.countDown())
+    }
+
+    /**
+     * Waits until at least a single updateLatestOrThrow completes 
successfully.
+     * NOTE: The method returns immediately if an updateLatestOrThrow call has 
already completed
+     * successfully.
+     *
+     * @param waitTimeMs   the timeout for the wait operation
+     *
+     * @throws             - RuntimeException if the thread was interrupted 
during wait
+     *                     - TimeoutException if the wait can not be completed 
in waitTimeMs
+     *                       milli seconds
+     */
+    def awaitUpdateOrThrow(waitTimeMs: Long): Unit = {
+      maybeNotifyOnce.foreach(notifier => {
+        var success = false
+        try {
+          success = notifier.await(waitTimeMs, TimeUnit.MILLISECONDS)
+        } catch {
+          case e: InterruptedException =>
+            throw new RuntimeException(
+              "Unable to wait for FinalizedFeatureCache update to finish.", e)
+        }
+
+        if (!success) {
+          throw new TimeoutException(
+            s"Timed out after waiting for ${waitTimeMs}ms for FeatureCache to 
be updated.")
+        }
+      })
+    }
+  }
+
+  /**
+   * A shutdownable thread to process feature node change notifications that 
are populated into the
+   * queue. If any change notification can not be processed successfully 
(unless it is due to an
+   * interrupt), the thread treats it as a fatal event and triggers Broker 
exit.
+   *
+   * @param name   name of the thread
+   */
+  private class ChangeNotificationProcessorThread(name: String) extends 
ShutdownableThread(name = name) {
+    override def doWork(): Unit = {
+      try {
+        queue.take.updateLatestOrThrow()
+      } catch {
+        case e: InterruptedException => info(s"Interrupted", e)
+        case e: Exception => {
+          error("Failed to process feature ZK node change event. The broker 
will exit.", e)
+          throw new FatalExitError(1)
+        }
+      }
+    }
+  }
+
+  // Feature ZK node change handler.
+  object FeatureZNodeChangeHandler extends ZNodeChangeHandler {
+    override val path: String = FeatureZNode.path
+
+    private def processNotification(): Unit = {

Review comment:
       feel neutral about this helper function

##########
File path: core/src/main/scala/kafka/server/FinalizedFeatureChangeListener.scala
##########
@@ -0,0 +1,203 @@
+package kafka.server
+
+import java.util.concurrent.{CountDownLatch, LinkedBlockingQueue, TimeUnit}
+
+import kafka.utils.{Logging, ShutdownableThread}
+import kafka.zk.{FeatureZNode,FeatureZNodeStatus, KafkaZkClient, ZkVersion}
+import kafka.zookeeper.ZNodeChangeHandler
+import org.apache.kafka.common.internals.FatalExitError
+
+import scala.concurrent.TimeoutException
+
+/**
+ * Listens to changes in the ZK feature node, via the ZK client. Whenever a 
change notification
+ * is received from ZK, the feature cache in FinalizedFeatureCache is 
asynchronously updated
+ * to the latest features read from ZK. The cache updates are serialized 
through a single
+ * notification processor thread.
+ *
+ * @param zkClient     the Zookeeper client
+ */
+class FinalizedFeatureChangeListener(zkClient: KafkaZkClient) extends Logging {
+
+  /**
+   * Helper class used to update the FinalizedFeatureCache.
+   *
+   * @param featureZkNodePath   the path to the ZK feature node to be read
+   * @param maybeNotifyOnce     an optional latch that can be used to propvide 
notification
+   *                            when an update operation is over
+   */
+  private class FeatureCacheUpdater(featureZkNodePath: String, 
maybeNotifyOnce: Option[CountDownLatch]) {
+
+    def this(featureZkNodePath: String) = this(featureZkNodePath, Option.empty)
+
+    /**
+     * Updates the feature cache in FinalizedFeatureCache with the latest 
features read from the
+     * ZK node in featureZkNodePath. If the cache update is not successful, 
then, a suitable
+     * exception is raised.
+     *
+     * NOTE: if a notifier was provided in the constructor, then, this method 
can be invoked
+     * only exactly once successfully.
+     */
+    def updateLatestOrThrow(): Unit = {
+      maybeNotifyOnce.foreach(notifier => {
+        if (notifier.getCount != 1) {
+          throw new IllegalStateException(
+            "Can not notify after updateLatestOrThrow was called more than 
once successfully.")
+        }
+      })
+
+      info(s"Reading feature ZK node at path: $featureZkNodePath")
+      val (mayBeFeatureZNodeBytes, version) = 
zkClient.getDataAndVersion(featureZkNodePath)
+      if (version == ZkVersion.UnknownVersion) {
+        info(s"Feature ZK node at path: $featureZkNodePath does not exist")
+        FinalizedFeatureCache.clear()
+      } else {
+        val featureZNode = FeatureZNode.decode(mayBeFeatureZNodeBytes.get)
+        if (featureZNode.status == FeatureZNodeStatus.Disabled) {
+          info(s"Feature ZK node at path: $featureZkNodePath is in disabled 
status")
+          FinalizedFeatureCache.clear()
+        } else if(featureZNode.status == FeatureZNodeStatus.Enabled) {
+          FinalizedFeatureCache.updateOrThrow(featureZNode.features, version)
+        } else {
+          throw new IllegalStateException(s"Unexpected FeatureZNodeStatus 
found in $featureZNode")
+        }
+      }
+
+      maybeNotifyOnce.foreach(notifier => notifier.countDown())
+    }
+
+    /**
+     * Waits until at least a single updateLatestOrThrow completes 
successfully.
+     * NOTE: The method returns immediately if an updateLatestOrThrow call has 
already completed
+     * successfully.
+     *
+     * @param waitTimeMs   the timeout for the wait operation
+     *
+     * @throws             - RuntimeException if the thread was interrupted 
during wait
+     *                     - TimeoutException if the wait can not be completed 
in waitTimeMs
+     *                       milli seconds
+     */
+    def awaitUpdateOrThrow(waitTimeMs: Long): Unit = {
+      maybeNotifyOnce.foreach(notifier => {
+        var success = false
+        try {
+          success = notifier.await(waitTimeMs, TimeUnit.MILLISECONDS)
+        } catch {
+          case e: InterruptedException =>
+            throw new RuntimeException(
+              "Unable to wait for FinalizedFeatureCache update to finish.", e)
+        }
+
+        if (!success) {
+          throw new TimeoutException(
+            s"Timed out after waiting for ${waitTimeMs}ms for FeatureCache to 
be updated.")
+        }
+      })
+    }
+  }
+
+  /**
+   * A shutdownable thread to process feature node change notifications that 
are populated into the
+   * queue. If any change notification can not be processed successfully 
(unless it is due to an
+   * interrupt), the thread treats it as a fatal event and triggers Broker 
exit.
+   *
+   * @param name   name of the thread
+   */
+  private class ChangeNotificationProcessorThread(name: String) extends 
ShutdownableThread(name = name) {
+    override def doWork(): Unit = {
+      try {
+        queue.take.updateLatestOrThrow()
+      } catch {
+        case e: InterruptedException => info(s"Interrupted", e)
+        case e: Exception => {
+          error("Failed to process feature ZK node change event. The broker 
will exit.", e)
+          throw new FatalExitError(1)
+        }
+      }
+    }
+  }
+
+  // Feature ZK node change handler.
+  object FeatureZNodeChangeHandler extends ZNodeChangeHandler {
+    override val path: String = FeatureZNode.path
+
+    private def processNotification(): Unit = {
+      queue.add(new FeatureCacheUpdater(path))
+    }
+
+    override def handleCreation(): Unit = {
+      info(s"Feature ZK node created at path: $path")
+      processNotification()
+    }
+
+    override def handleDataChange(): Unit = {
+      info(s"Feature ZK node updated at path: $path")
+      processNotification()
+    }
+
+    override def handleDeletion(): Unit = {
+      warn(s"Feature ZK node deleted at path: $path")
+      // This event may happen, rarely (ex: operational error).
+      // In such a case, we prefer to just log a warning and treat the case as 
if the node is absent,
+      // and populate the FinalizedFeatureCache with empty finalized features.
+      processNotification()
+    }
+  }
+
+  private val queue = new LinkedBlockingQueue[FeatureCacheUpdater]
+
+  private val thread = new 
ChangeNotificationProcessorThread("feature-zk-node-event-process-thread")
+
+  /**
+   * This method initializes the feature ZK node change listener. Optionally, 
it also ensures to
+   * update the FinalizedFeatureCache once with the latest contents of the 
feature ZK node
+   * (if the node exists). This step helps ensure that feature 
incompatibilities (if any) in brokers
+   * are conveniently detected before the initOrThrow() method returns to the 
caller. If feature
+   * incompatibilities are detected, this method will throw an Exception to 
the caller, and the Broker
+   * would exit eventually.
+   *
+   * @param waitOnceForCacheUpdateMs   # of milli seconds to wait for feature 
cache to be updated once.
+   *                                   If this parameter <= 0, no wait 
operation happens.
+   *
+   * @throws Exception if feature incompatibility check could not be finished 
in a timely manner
+   */
+  def initOrThrow(waitOnceForCacheUpdateMs: Long): Unit = {
+    thread.start()
+    
zkClient.registerZNodeChangeHandlerAndCheckExistence(FeatureZNodeChangeHandler)
+
+    if (waitOnceForCacheUpdateMs > 0) {
+      val barrier = new CountDownLatch(1)

Review comment:
       nit: don't feel strong about having this parameter

##########
File path: core/src/main/scala/kafka/server/FinalizedFeatureChangeListener.scala
##########
@@ -0,0 +1,203 @@
+package kafka.server
+
+import java.util.concurrent.{CountDownLatch, LinkedBlockingQueue, TimeUnit}
+
+import kafka.utils.{Logging, ShutdownableThread}
+import kafka.zk.{FeatureZNode,FeatureZNodeStatus, KafkaZkClient, ZkVersion}
+import kafka.zookeeper.ZNodeChangeHandler
+import org.apache.kafka.common.internals.FatalExitError
+
+import scala.concurrent.TimeoutException
+
+/**
+ * Listens to changes in the ZK feature node, via the ZK client. Whenever a 
change notification
+ * is received from ZK, the feature cache in FinalizedFeatureCache is 
asynchronously updated
+ * to the latest features read from ZK. The cache updates are serialized 
through a single
+ * notification processor thread.
+ *
+ * @param zkClient     the Zookeeper client
+ */
+class FinalizedFeatureChangeListener(zkClient: KafkaZkClient) extends Logging {
+
+  /**
+   * Helper class used to update the FinalizedFeatureCache.
+   *
+   * @param featureZkNodePath   the path to the ZK feature node to be read
+   * @param maybeNotifyOnce     an optional latch that can be used to propvide 
notification
+   *                            when an update operation is over
+   */
+  private class FeatureCacheUpdater(featureZkNodePath: String, 
maybeNotifyOnce: Option[CountDownLatch]) {
+
+    def this(featureZkNodePath: String) = this(featureZkNodePath, Option.empty)
+
+    /**
+     * Updates the feature cache in FinalizedFeatureCache with the latest 
features read from the
+     * ZK node in featureZkNodePath. If the cache update is not successful, 
then, a suitable
+     * exception is raised.
+     *
+     * NOTE: if a notifier was provided in the constructor, then, this method 
can be invoked
+     * only exactly once successfully.
+     */
+    def updateLatestOrThrow(): Unit = {
+      maybeNotifyOnce.foreach(notifier => {
+        if (notifier.getCount != 1) {
+          throw new IllegalStateException(
+            "Can not notify after updateLatestOrThrow was called more than 
once successfully.")
+        }
+      })
+
+      info(s"Reading feature ZK node at path: $featureZkNodePath")
+      val (mayBeFeatureZNodeBytes, version) = 
zkClient.getDataAndVersion(featureZkNodePath)
+      if (version == ZkVersion.UnknownVersion) {
+        info(s"Feature ZK node at path: $featureZkNodePath does not exist")
+        FinalizedFeatureCache.clear()
+      } else {
+        val featureZNode = FeatureZNode.decode(mayBeFeatureZNodeBytes.get)
+        if (featureZNode.status == FeatureZNodeStatus.Disabled) {
+          info(s"Feature ZK node at path: $featureZkNodePath is in disabled 
status")
+          FinalizedFeatureCache.clear()
+        } else if(featureZNode.status == FeatureZNodeStatus.Enabled) {
+          FinalizedFeatureCache.updateOrThrow(featureZNode.features, version)
+        } else {
+          throw new IllegalStateException(s"Unexpected FeatureZNodeStatus 
found in $featureZNode")
+        }
+      }
+
+      maybeNotifyOnce.foreach(notifier => notifier.countDown())
+    }
+
+    /**
+     * Waits until at least a single updateLatestOrThrow completes 
successfully.
+     * NOTE: The method returns immediately if an updateLatestOrThrow call has 
already completed
+     * successfully.
+     *
+     * @param waitTimeMs   the timeout for the wait operation
+     *
+     * @throws             - RuntimeException if the thread was interrupted 
during wait
+     *                     - TimeoutException if the wait can not be completed 
in waitTimeMs
+     *                       milli seconds
+     */
+    def awaitUpdateOrThrow(waitTimeMs: Long): Unit = {
+      maybeNotifyOnce.foreach(notifier => {
+        var success = false
+        try {
+          success = notifier.await(waitTimeMs, TimeUnit.MILLISECONDS)
+        } catch {
+          case e: InterruptedException =>
+            throw new RuntimeException(
+              "Unable to wait for FinalizedFeatureCache update to finish.", e)
+        }
+
+        if (!success) {
+          throw new TimeoutException(
+            s"Timed out after waiting for ${waitTimeMs}ms for FeatureCache to 
be updated.")
+        }
+      })
+    }
+  }
+
+  /**
+   * A shutdownable thread to process feature node change notifications that 
are populated into the
+   * queue. If any change notification can not be processed successfully 
(unless it is due to an
+   * interrupt), the thread treats it as a fatal event and triggers Broker 
exit.
+   *
+   * @param name   name of the thread
+   */
+  private class ChangeNotificationProcessorThread(name: String) extends 
ShutdownableThread(name = name) {
+    override def doWork(): Unit = {
+      try {
+        queue.take.updateLatestOrThrow()
+      } catch {
+        case e: InterruptedException => info(s"Interrupted", e)
+        case e: Exception => {
+          error("Failed to process feature ZK node change event. The broker 
will exit.", e)
+          throw new FatalExitError(1)
+        }
+      }
+    }
+  }
+
+  // Feature ZK node change handler.
+  object FeatureZNodeChangeHandler extends ZNodeChangeHandler {
+    override val path: String = FeatureZNode.path
+
+    private def processNotification(): Unit = {
+      queue.add(new FeatureCacheUpdater(path))
+    }
+
+    override def handleCreation(): Unit = {
+      info(s"Feature ZK node created at path: $path")
+      processNotification()
+    }
+
+    override def handleDataChange(): Unit = {
+      info(s"Feature ZK node updated at path: $path")
+      processNotification()
+    }
+
+    override def handleDeletion(): Unit = {
+      warn(s"Feature ZK node deleted at path: $path")
+      // This event may happen, rarely (ex: operational error).
+      // In such a case, we prefer to just log a warning and treat the case as 
if the node is absent,
+      // and populate the FinalizedFeatureCache with empty finalized features.
+      processNotification()
+    }
+  }
+
+  private val queue = new LinkedBlockingQueue[FeatureCacheUpdater]
+
+  private val thread = new 
ChangeNotificationProcessorThread("feature-zk-node-event-process-thread")
+
+  /**
+   * This method initializes the feature ZK node change listener. Optionally, 
it also ensures to
+   * update the FinalizedFeatureCache once with the latest contents of the 
feature ZK node
+   * (if the node exists). This step helps ensure that feature 
incompatibilities (if any) in brokers
+   * are conveniently detected before the initOrThrow() method returns to the 
caller. If feature
+   * incompatibilities are detected, this method will throw an Exception to 
the caller, and the Broker
+   * would exit eventually.
+   *
+   * @param waitOnceForCacheUpdateMs   # of milli seconds to wait for feature 
cache to be updated once.
+   *                                   If this parameter <= 0, no wait 
operation happens.
+   *
+   * @throws Exception if feature incompatibility check could not be finished 
in a timely manner
+   */
+  def initOrThrow(waitOnceForCacheUpdateMs: Long): Unit = {
+    thread.start()
+    
zkClient.registerZNodeChangeHandlerAndCheckExistence(FeatureZNodeChangeHandler)

Review comment:
       For an educational question, does the zkClient have a separate thread to 
do the node change monitoring?

##########
File path: core/src/main/scala/kafka/server/FinalizedFeatureChangeListener.scala
##########
@@ -0,0 +1,203 @@
+package kafka.server
+
+import java.util.concurrent.{CountDownLatch, LinkedBlockingQueue, TimeUnit}
+
+import kafka.utils.{Logging, ShutdownableThread}
+import kafka.zk.{FeatureZNode,FeatureZNodeStatus, KafkaZkClient, ZkVersion}
+import kafka.zookeeper.ZNodeChangeHandler
+import org.apache.kafka.common.internals.FatalExitError
+
+import scala.concurrent.TimeoutException
+
+/**
+ * Listens to changes in the ZK feature node, via the ZK client. Whenever a 
change notification
+ * is received from ZK, the feature cache in FinalizedFeatureCache is 
asynchronously updated
+ * to the latest features read from ZK. The cache updates are serialized 
through a single
+ * notification processor thread.
+ *
+ * @param zkClient     the Zookeeper client
+ */
+class FinalizedFeatureChangeListener(zkClient: KafkaZkClient) extends Logging {
+
+  /**
+   * Helper class used to update the FinalizedFeatureCache.
+   *
+   * @param featureZkNodePath   the path to the ZK feature node to be read
+   * @param maybeNotifyOnce     an optional latch that can be used to propvide 
notification
+   *                            when an update operation is over
+   */
+  private class FeatureCacheUpdater(featureZkNodePath: String, 
maybeNotifyOnce: Option[CountDownLatch]) {
+
+    def this(featureZkNodePath: String) = this(featureZkNodePath, Option.empty)
+
+    /**
+     * Updates the feature cache in FinalizedFeatureCache with the latest 
features read from the
+     * ZK node in featureZkNodePath. If the cache update is not successful, 
then, a suitable
+     * exception is raised.
+     *
+     * NOTE: if a notifier was provided in the constructor, then, this method 
can be invoked
+     * only exactly once successfully.
+     */
+    def updateLatestOrThrow(): Unit = {

Review comment:
       Could we summary the possible thrown error code in the comment as well? 
For example, does a JSON deserialization error should be treated as fatal?

##########
File path: core/src/main/scala/kafka/zk/ZkData.scala
##########
@@ -744,6 +782,90 @@ object DelegationTokenInfoZNode {
   def decode(bytes: Array[Byte]): Option[TokenInformation] = 
DelegationTokenManager.fromBytes(bytes)
 }
 
+object FeatureZNodeStatus extends Enumeration {
+  val Disabled, Enabled = Value
+
+  def withNameOpt(value: Int): Option[Value] = {
+    values.find(_.id == value)
+  }
+}
+
+case class FeatureZNode(status: FeatureZNodeStatus.Value, features: 
Features[FinalizedVersionRange]) {
+}
+
+object FeatureZNode {

Review comment:
       I feel we might worth creating a separate thread discussing whether we 
could get some benefit of the automated protocol generation framework here, as 
I think this could be easily represented as JSON if we define it in the common 
package like other RPC data. The difficulty right now is mostly on the 
serialization and deserialization for feature itself, but these could have 
workarounds if we want to do so.

##########
File path: core/src/main/scala/kafka/server/FinalizedFeatureChangeListener.scala
##########
@@ -0,0 +1,203 @@
+package kafka.server
+
+import java.util.concurrent.{CountDownLatch, LinkedBlockingQueue, TimeUnit}
+
+import kafka.utils.{Logging, ShutdownableThread}
+import kafka.zk.{FeatureZNode,FeatureZNodeStatus, KafkaZkClient, ZkVersion}
+import kafka.zookeeper.ZNodeChangeHandler
+import org.apache.kafka.common.internals.FatalExitError
+
+import scala.concurrent.TimeoutException
+
+/**
+ * Listens to changes in the ZK feature node, via the ZK client. Whenever a 
change notification
+ * is received from ZK, the feature cache in FinalizedFeatureCache is 
asynchronously updated
+ * to the latest features read from ZK. The cache updates are serialized 
through a single
+ * notification processor thread.
+ *
+ * @param zkClient     the Zookeeper client
+ */
+class FinalizedFeatureChangeListener(zkClient: KafkaZkClient) extends Logging {
+
+  /**
+   * Helper class used to update the FinalizedFeatureCache.
+   *
+   * @param featureZkNodePath   the path to the ZK feature node to be read
+   * @param maybeNotifyOnce     an optional latch that can be used to propvide 
notification
+   *                            when an update operation is over
+   */
+  private class FeatureCacheUpdater(featureZkNodePath: String, 
maybeNotifyOnce: Option[CountDownLatch]) {
+
+    def this(featureZkNodePath: String) = this(featureZkNodePath, Option.empty)
+
+    /**
+     * Updates the feature cache in FinalizedFeatureCache with the latest 
features read from the
+     * ZK node in featureZkNodePath. If the cache update is not successful, 
then, a suitable
+     * exception is raised.
+     *
+     * NOTE: if a notifier was provided in the constructor, then, this method 
can be invoked
+     * only exactly once successfully.
+     */
+    def updateLatestOrThrow(): Unit = {
+      maybeNotifyOnce.foreach(notifier => {
+        if (notifier.getCount != 1) {
+          throw new IllegalStateException(
+            "Can not notify after updateLatestOrThrow was called more than 
once successfully.")
+        }
+      })
+
+      info(s"Reading feature ZK node at path: $featureZkNodePath")
+      val (mayBeFeatureZNodeBytes, version) = 
zkClient.getDataAndVersion(featureZkNodePath)
+      if (version == ZkVersion.UnknownVersion) {
+        info(s"Feature ZK node at path: $featureZkNodePath does not exist")
+        FinalizedFeatureCache.clear()
+      } else {
+        val featureZNode = FeatureZNode.decode(mayBeFeatureZNodeBytes.get)
+        if (featureZNode.status == FeatureZNodeStatus.Disabled) {
+          info(s"Feature ZK node at path: $featureZkNodePath is in disabled 
status")
+          FinalizedFeatureCache.clear()
+        } else if(featureZNode.status == FeatureZNodeStatus.Enabled) {
+          FinalizedFeatureCache.updateOrThrow(featureZNode.features, version)
+        } else {
+          throw new IllegalStateException(s"Unexpected FeatureZNodeStatus 
found in $featureZNode")
+        }
+      }
+
+      maybeNotifyOnce.foreach(notifier => notifier.countDown())
+    }
+
+    /**
+     * Waits until at least a single updateLatestOrThrow completes 
successfully.
+     * NOTE: The method returns immediately if an updateLatestOrThrow call has 
already completed
+     * successfully.
+     *
+     * @param waitTimeMs   the timeout for the wait operation
+     *
+     * @throws             - RuntimeException if the thread was interrupted 
during wait
+     *                     - TimeoutException if the wait can not be completed 
in waitTimeMs
+     *                       milli seconds
+     */
+    def awaitUpdateOrThrow(waitTimeMs: Long): Unit = {
+      maybeNotifyOnce.foreach(notifier => {
+        var success = false
+        try {
+          success = notifier.await(waitTimeMs, TimeUnit.MILLISECONDS)
+        } catch {
+          case e: InterruptedException =>
+            throw new RuntimeException(
+              "Unable to wait for FinalizedFeatureCache update to finish.", e)
+        }
+
+        if (!success) {
+          throw new TimeoutException(
+            s"Timed out after waiting for ${waitTimeMs}ms for FeatureCache to 
be updated.")
+        }
+      })
+    }
+  }
+
+  /**
+   * A shutdownable thread to process feature node change notifications that 
are populated into the
+   * queue. If any change notification can not be processed successfully 
(unless it is due to an
+   * interrupt), the thread treats it as a fatal event and triggers Broker 
exit.
+   *
+   * @param name   name of the thread
+   */
+  private class ChangeNotificationProcessorThread(name: String) extends 
ShutdownableThread(name = name) {
+    override def doWork(): Unit = {
+      try {
+        queue.take.updateLatestOrThrow()
+      } catch {
+        case e: InterruptedException => info(s"Interrupted", e)
+        case e: Exception => {
+          error("Failed to process feature ZK node change event. The broker 
will exit.", e)
+          throw new FatalExitError(1)
+        }
+      }
+    }
+  }
+
+  // Feature ZK node change handler.
+  object FeatureZNodeChangeHandler extends ZNodeChangeHandler {
+    override val path: String = FeatureZNode.path
+
+    private def processNotification(): Unit = {
+      queue.add(new FeatureCacheUpdater(path))
+    }
+
+    override def handleCreation(): Unit = {
+      info(s"Feature ZK node created at path: $path")
+      processNotification()
+    }
+
+    override def handleDataChange(): Unit = {
+      info(s"Feature ZK node updated at path: $path")
+      processNotification()
+    }
+
+    override def handleDeletion(): Unit = {
+      warn(s"Feature ZK node deleted at path: $path")
+      // This event may happen, rarely (ex: operational error).
+      // In such a case, we prefer to just log a warning and treat the case as 
if the node is absent,
+      // and populate the FinalizedFeatureCache with empty finalized features.
+      processNotification()
+    }
+  }
+
+  private val queue = new LinkedBlockingQueue[FeatureCacheUpdater]
+
+  private val thread = new 
ChangeNotificationProcessorThread("feature-zk-node-event-process-thread")
+
+  /**
+   * This method initializes the feature ZK node change listener. Optionally, 
it also ensures to
+   * update the FinalizedFeatureCache once with the latest contents of the 
feature ZK node
+   * (if the node exists). This step helps ensure that feature 
incompatibilities (if any) in brokers
+   * are conveniently detected before the initOrThrow() method returns to the 
caller. If feature
+   * incompatibilities are detected, this method will throw an Exception to 
the caller, and the Broker
+   * would exit eventually.
+   *
+   * @param waitOnceForCacheUpdateMs   # of milli seconds to wait for feature 
cache to be updated once.
+   *                                   If this parameter <= 0, no wait 
operation happens.
+   *
+   * @throws Exception if feature incompatibility check could not be finished 
in a timely manner
+   */
+  def initOrThrow(waitOnceForCacheUpdateMs: Long): Unit = {
+    thread.start()
+    
zkClient.registerZNodeChangeHandlerAndCheckExistence(FeatureZNodeChangeHandler)
+
+    if (waitOnceForCacheUpdateMs > 0) {
+      val barrier = new CountDownLatch(1)
+      val ensureCacheUpdateOnce = new 
FeatureCacheUpdater(FeatureZNodeChangeHandler.path, Some(barrier))
+      queue.add(ensureCacheUpdateOnce)
+      try {
+        ensureCacheUpdateOnce.awaitUpdateOrThrow(waitOnceForCacheUpdateMs)
+      } catch {
+        case e: Exception => {
+          close()
+          throw e
+        }
+      }
+    }
+  }
+
+  /**
+   * Closes the feature ZK node change listener by unregistering the listener 
from ZK client,
+   * clearing the queue and shutting down the 
ChangeNotificationProcessorThread.
+   */
+  def close(): Unit = {
+    zkClient.unregisterZNodeChangeHandler(FeatureZNodeChangeHandler.path)

Review comment:
       Does the order matter here? I was wondering if there is any concurrent 
issue if we unregister before the queue and thread get cleaned up.

##########
File path: core/src/main/scala/kafka/server/FinalizedFeatureChangeListener.scala
##########
@@ -0,0 +1,203 @@
+package kafka.server
+
+import java.util.concurrent.{CountDownLatch, LinkedBlockingQueue, TimeUnit}
+
+import kafka.utils.{Logging, ShutdownableThread}
+import kafka.zk.{FeatureZNode,FeatureZNodeStatus, KafkaZkClient, ZkVersion}
+import kafka.zookeeper.ZNodeChangeHandler
+import org.apache.kafka.common.internals.FatalExitError
+
+import scala.concurrent.TimeoutException
+
+/**
+ * Listens to changes in the ZK feature node, via the ZK client. Whenever a 
change notification
+ * is received from ZK, the feature cache in FinalizedFeatureCache is 
asynchronously updated
+ * to the latest features read from ZK. The cache updates are serialized 
through a single
+ * notification processor thread.
+ *
+ * @param zkClient     the Zookeeper client
+ */
+class FinalizedFeatureChangeListener(zkClient: KafkaZkClient) extends Logging {
+
+  /**
+   * Helper class used to update the FinalizedFeatureCache.
+   *
+   * @param featureZkNodePath   the path to the ZK feature node to be read
+   * @param maybeNotifyOnce     an optional latch that can be used to propvide 
notification
+   *                            when an update operation is over
+   */
+  private class FeatureCacheUpdater(featureZkNodePath: String, 
maybeNotifyOnce: Option[CountDownLatch]) {
+
+    def this(featureZkNodePath: String) = this(featureZkNodePath, Option.empty)
+
+    /**
+     * Updates the feature cache in FinalizedFeatureCache with the latest 
features read from the
+     * ZK node in featureZkNodePath. If the cache update is not successful, 
then, a suitable
+     * exception is raised.
+     *
+     * NOTE: if a notifier was provided in the constructor, then, this method 
can be invoked
+     * only exactly once successfully.
+     */
+    def updateLatestOrThrow(): Unit = {
+      maybeNotifyOnce.foreach(notifier => {
+        if (notifier.getCount != 1) {
+          throw new IllegalStateException(
+            "Can not notify after updateLatestOrThrow was called more than 
once successfully.")
+        }
+      })
+
+      info(s"Reading feature ZK node at path: $featureZkNodePath")
+      val (mayBeFeatureZNodeBytes, version) = 
zkClient.getDataAndVersion(featureZkNodePath)
+      if (version == ZkVersion.UnknownVersion) {
+        info(s"Feature ZK node at path: $featureZkNodePath does not exist")
+        FinalizedFeatureCache.clear()
+      } else {
+        val featureZNode = FeatureZNode.decode(mayBeFeatureZNodeBytes.get)
+        if (featureZNode.status == FeatureZNodeStatus.Disabled) {
+          info(s"Feature ZK node at path: $featureZkNodePath is in disabled 
status")
+          FinalizedFeatureCache.clear()
+        } else if(featureZNode.status == FeatureZNodeStatus.Enabled) {
+          FinalizedFeatureCache.updateOrThrow(featureZNode.features, version)
+        } else {
+          throw new IllegalStateException(s"Unexpected FeatureZNodeStatus 
found in $featureZNode")
+        }
+      }
+
+      maybeNotifyOnce.foreach(notifier => notifier.countDown())
+    }
+
+    /**
+     * Waits until at least a single updateLatestOrThrow completes 
successfully.
+     * NOTE: The method returns immediately if an updateLatestOrThrow call has 
already completed
+     * successfully.
+     *
+     * @param waitTimeMs   the timeout for the wait operation
+     *
+     * @throws             - RuntimeException if the thread was interrupted 
during wait
+     *                     - TimeoutException if the wait can not be completed 
in waitTimeMs
+     *                       milli seconds
+     */
+    def awaitUpdateOrThrow(waitTimeMs: Long): Unit = {
+      maybeNotifyOnce.foreach(notifier => {
+        var success = false
+        try {
+          success = notifier.await(waitTimeMs, TimeUnit.MILLISECONDS)
+        } catch {
+          case e: InterruptedException =>
+            throw new RuntimeException(
+              "Unable to wait for FinalizedFeatureCache update to finish.", e)
+        }
+
+        if (!success) {
+          throw new TimeoutException(
+            s"Timed out after waiting for ${waitTimeMs}ms for FeatureCache to 
be updated.")
+        }
+      })
+    }
+  }
+
+  /**
+   * A shutdownable thread to process feature node change notifications that 
are populated into the
+   * queue. If any change notification can not be processed successfully 
(unless it is due to an
+   * interrupt), the thread treats it as a fatal event and triggers Broker 
exit.
+   *
+   * @param name   name of the thread
+   */
+  private class ChangeNotificationProcessorThread(name: String) extends 
ShutdownableThread(name = name) {
+    override def doWork(): Unit = {
+      try {
+        queue.take.updateLatestOrThrow()
+      } catch {
+        case e: InterruptedException => info(s"Interrupted", e)

Review comment:
       `Feature cache update gets interrupted`

##########
File path: core/src/main/scala/kafka/server/KafkaServer.scala
##########
@@ -210,6 +215,14 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
Time.SYSTEM, threadNameP
         /* setup zookeeper */
         initZkClient(time)
 
+        /* initialize features */
+        _featureChangeListener = new FinalizedFeatureChangeListener(_zkClient)
+        if (config.interBrokerProtocolVersion >= KAFKA_2_6_IV1) {
+          // The feature versioning system (KIP-584) is active only when:

Review comment:
       I think the comment is not necessary, since we have already commented on 
`KAFKA_2_6_IV1`

##########
File path: core/src/main/scala/kafka/server/FinalizedFeatureChangeListener.scala
##########
@@ -0,0 +1,203 @@
+package kafka.server
+
+import java.util.concurrent.{CountDownLatch, LinkedBlockingQueue, TimeUnit}
+
+import kafka.utils.{Logging, ShutdownableThread}
+import kafka.zk.{FeatureZNode,FeatureZNodeStatus, KafkaZkClient, ZkVersion}
+import kafka.zookeeper.ZNodeChangeHandler
+import org.apache.kafka.common.internals.FatalExitError
+
+import scala.concurrent.TimeoutException
+
+/**
+ * Listens to changes in the ZK feature node, via the ZK client. Whenever a 
change notification
+ * is received from ZK, the feature cache in FinalizedFeatureCache is 
asynchronously updated
+ * to the latest features read from ZK. The cache updates are serialized 
through a single
+ * notification processor thread.
+ *
+ * @param zkClient     the Zookeeper client
+ */
+class FinalizedFeatureChangeListener(zkClient: KafkaZkClient) extends Logging {
+
+  /**
+   * Helper class used to update the FinalizedFeatureCache.
+   *
+   * @param featureZkNodePath   the path to the ZK feature node to be read
+   * @param maybeNotifyOnce     an optional latch that can be used to propvide 
notification
+   *                            when an update operation is over
+   */
+  private class FeatureCacheUpdater(featureZkNodePath: String, 
maybeNotifyOnce: Option[CountDownLatch]) {
+
+    def this(featureZkNodePath: String) = this(featureZkNodePath, Option.empty)
+
+    /**
+     * Updates the feature cache in FinalizedFeatureCache with the latest 
features read from the
+     * ZK node in featureZkNodePath. If the cache update is not successful, 
then, a suitable
+     * exception is raised.
+     *
+     * NOTE: if a notifier was provided in the constructor, then, this method 
can be invoked
+     * only exactly once successfully.
+     */
+    def updateLatestOrThrow(): Unit = {
+      maybeNotifyOnce.foreach(notifier => {
+        if (notifier.getCount != 1) {
+          throw new IllegalStateException(
+            "Can not notify after updateLatestOrThrow was called more than 
once successfully.")
+        }
+      })
+
+      info(s"Reading feature ZK node at path: $featureZkNodePath")
+      val (mayBeFeatureZNodeBytes, version) = 
zkClient.getDataAndVersion(featureZkNodePath)
+      if (version == ZkVersion.UnknownVersion) {
+        info(s"Feature ZK node at path: $featureZkNodePath does not exist")
+        FinalizedFeatureCache.clear()
+      } else {
+        val featureZNode = FeatureZNode.decode(mayBeFeatureZNodeBytes.get)
+        if (featureZNode.status == FeatureZNodeStatus.Disabled) {
+          info(s"Feature ZK node at path: $featureZkNodePath is in disabled 
status")
+          FinalizedFeatureCache.clear()
+        } else if(featureZNode.status == FeatureZNodeStatus.Enabled) {
+          FinalizedFeatureCache.updateOrThrow(featureZNode.features, version)
+        } else {
+          throw new IllegalStateException(s"Unexpected FeatureZNodeStatus 
found in $featureZNode")
+        }
+      }
+
+      maybeNotifyOnce.foreach(notifier => notifier.countDown())
+    }
+
+    /**
+     * Waits until at least a single updateLatestOrThrow completes 
successfully.
+     * NOTE: The method returns immediately if an updateLatestOrThrow call has 
already completed
+     * successfully.
+     *
+     * @param waitTimeMs   the timeout for the wait operation
+     *
+     * @throws             - RuntimeException if the thread was interrupted 
during wait
+     *                     - TimeoutException if the wait can not be completed 
in waitTimeMs
+     *                       milli seconds
+     */
+    def awaitUpdateOrThrow(waitTimeMs: Long): Unit = {
+      maybeNotifyOnce.foreach(notifier => {
+        var success = false
+        try {
+          success = notifier.await(waitTimeMs, TimeUnit.MILLISECONDS)
+        } catch {
+          case e: InterruptedException =>
+            throw new RuntimeException(
+              "Unable to wait for FinalizedFeatureCache update to finish.", e)
+        }
+
+        if (!success) {
+          throw new TimeoutException(
+            s"Timed out after waiting for ${waitTimeMs}ms for FeatureCache to 
be updated.")
+        }
+      })
+    }
+  }
+
+  /**
+   * A shutdownable thread to process feature node change notifications that 
are populated into the
+   * queue. If any change notification can not be processed successfully 
(unless it is due to an
+   * interrupt), the thread treats it as a fatal event and triggers Broker 
exit.
+   *
+   * @param name   name of the thread
+   */
+  private class ChangeNotificationProcessorThread(name: String) extends 
ShutdownableThread(name = name) {
+    override def doWork(): Unit = {
+      try {
+        queue.take.updateLatestOrThrow()
+      } catch {
+        case e: InterruptedException => info(s"Interrupted", e)
+        case e: Exception => {
+          error("Failed to process feature ZK node change event. The broker 
will exit.", e)
+          throw new FatalExitError(1)
+        }
+      }
+    }
+  }
+
+  // Feature ZK node change handler.
+  object FeatureZNodeChangeHandler extends ZNodeChangeHandler {
+    override val path: String = FeatureZNode.path
+
+    private def processNotification(): Unit = {
+      queue.add(new FeatureCacheUpdater(path))
+    }
+
+    override def handleCreation(): Unit = {
+      info(s"Feature ZK node created at path: $path")
+      processNotification()
+    }
+
+    override def handleDataChange(): Unit = {
+      info(s"Feature ZK node updated at path: $path")
+      processNotification()
+    }
+
+    override def handleDeletion(): Unit = {
+      warn(s"Feature ZK node deleted at path: $path")
+      // This event may happen, rarely (ex: operational error).
+      // In such a case, we prefer to just log a warning and treat the case as 
if the node is absent,
+      // and populate the FinalizedFeatureCache with empty finalized features.
+      processNotification()
+    }
+  }
+
+  private val queue = new LinkedBlockingQueue[FeatureCacheUpdater]
+
+  private val thread = new 
ChangeNotificationProcessorThread("feature-zk-node-event-process-thread")
+
+  /**
+   * This method initializes the feature ZK node change listener. Optionally, 
it also ensures to
+   * update the FinalizedFeatureCache once with the latest contents of the 
feature ZK node
+   * (if the node exists). This step helps ensure that feature 
incompatibilities (if any) in brokers
+   * are conveniently detected before the initOrThrow() method returns to the 
caller. If feature
+   * incompatibilities are detected, this method will throw an Exception to 
the caller, and the Broker
+   * would exit eventually.
+   *
+   * @param waitOnceForCacheUpdateMs   # of milli seconds to wait for feature 
cache to be updated once.
+   *                                   If this parameter <= 0, no wait 
operation happens.
+   *
+   * @throws Exception if feature incompatibility check could not be finished 
in a timely manner
+   */
+  def initOrThrow(waitOnceForCacheUpdateMs: Long): Unit = {
+    thread.start()
+    
zkClient.registerZNodeChangeHandlerAndCheckExistence(FeatureZNodeChangeHandler)
+
+    if (waitOnceForCacheUpdateMs > 0) {
+      val barrier = new CountDownLatch(1)
+      val ensureCacheUpdateOnce = new 
FeatureCacheUpdater(FeatureZNodeChangeHandler.path, Some(barrier))
+      queue.add(ensureCacheUpdateOnce)
+      try {
+        ensureCacheUpdateOnce.awaitUpdateOrThrow(waitOnceForCacheUpdateMs)
+      } catch {
+        case e: Exception => {
+          close()
+          throw e
+        }
+      }
+    }
+  }
+
+  /**
+   * Closes the feature ZK node change listener by unregistering the listener 
from ZK client,
+   * clearing the queue and shutting down the 
ChangeNotificationProcessorThread.
+   */
+  def close(): Unit = {
+    zkClient.unregisterZNodeChangeHandler(FeatureZNodeChangeHandler.path)
+    queue.clear()
+    thread.shutdown()
+    thread.join()
+  }
+
+  // Useful for testing.

Review comment:
       We could just comment `For testing only`

##########
File path: core/src/main/scala/kafka/server/SupportedFeatures.scala
##########
@@ -0,0 +1,74 @@
+package kafka.server
+
+import kafka.utils.Logging
+import org.apache.kafka.common.feature.{Features, FinalizedVersionRange, 
SupportedVersionRange}
+import org.apache.kafka.common.feature.Features._
+
+import scala.jdk.CollectionConverters._
+
+/**
+ * A common object used in the Broker to define the latest features supported 
by the Broker.
+ * Also provides API to check for incompatibilities between the latest 
features supported by the
+ * Broker and cluster-wide finalized features.
+ */
+object SupportedFeatures extends Logging {
+
+  /**
+   * This is the latest features supported by the Broker.
+   * This is currently empty, but in the future as we define supported 
features, this map should be
+   * populated.
+   */
+  @volatile private var supportedFeatures = emptySupportedFeatures
+
+  /**
+   * Returns the latest features supported by the Broker.
+   */
+  def get: Features[SupportedVersionRange] = {
+    supportedFeatures
+  }
+
+  // Should be used only for testing.
+  def update(newFeatures: Features[SupportedVersionRange]): Unit = {
+    supportedFeatures = newFeatures
+  }
+
+  // Should be used only for testing.
+  def clear(): Unit = {
+    supportedFeatures = emptySupportedFeatures
+  }
+
+  /**
+   * Returns the set of feature names found to be 'incompatible'.
+   * A feature incompatibility is a version mismatch between the latest 
feature supported by the
+   * Broker, and the provided cluster-wide finalized feature. This can happen 
because a provided
+   * cluster-wide finalized feature:
+   *  1) Does not exist in the Broker (i.e. it is unknown to the Broker).
+   *           [OR]
+   *  2) Exists but the FinalizedVersionRange does not match with the 
supported feature's SupportedVersionRange.
+   *
+   * @param finalized   The finalized features against which incompatibilities 
need to be checked for.
+   *
+   * @return            The set of incompatible feature names. If the returned 
set is empty, it
+   *                    means there were no feature incompatibilities found.
+   */
+  def incompatibleFeatures(finalized: Features[FinalizedVersionRange]): 
Set[String] = {
+    val incompatibilities = finalized.features.asScala.collect {
+      case (feature, versionLevels) => {
+        val supportedVersions = supportedFeatures.get(feature);
+        if (supportedVersions == null) {
+          (feature, "{feature=%s, reason='Unsupported 
feature'}".format(feature))
+        } else if (versionLevels.isIncompatibleWith(supportedVersions)) {
+          (feature, "{feature=%s, reason='Finalized %s is incompatible with 
supported %s'}".format(
+            feature, versionLevels, supportedVersions))
+        } else {
+          (feature, null)
+        }
+      }
+    }.filter(entry => entry._2 != null)
+
+    if (incompatibilities.nonEmpty) {

Review comment:
       This logging is duplicate

##########
File path: core/src/main/scala/kafka/server/SupportedFeatures.scala
##########
@@ -0,0 +1,74 @@
+package kafka.server
+
+import kafka.utils.Logging
+import org.apache.kafka.common.feature.{Features, FinalizedVersionRange, 
SupportedVersionRange}
+import org.apache.kafka.common.feature.Features._
+
+import scala.jdk.CollectionConverters._
+
+/**
+ * A common object used in the Broker to define the latest features supported 
by the Broker.
+ * Also provides API to check for incompatibilities between the latest 
features supported by the
+ * Broker and cluster-wide finalized features.
+ */
+object SupportedFeatures extends Logging {
+
+  /**
+   * This is the latest features supported by the Broker.
+   * This is currently empty, but in the future as we define supported 
features, this map should be
+   * populated.
+   */
+  @volatile private var supportedFeatures = emptySupportedFeatures
+
+  /**
+   * Returns the latest features supported by the Broker.

Review comment:
       nit: Returns a reference to the latest features supported by the broker.

##########
File path: core/src/main/scala/kafka/server/FinalizedFeatureChangeListener.scala
##########
@@ -0,0 +1,200 @@
+package kafka.server
+
+import java.util.concurrent.{CountDownLatch, LinkedBlockingQueue, TimeUnit}
+
+import kafka.utils.{Logging, ShutdownableThread}
+import kafka.zk.{FeatureZNode,FeatureZNodeStatus, KafkaZkClient, ZkVersion}
+import kafka.zookeeper.ZNodeChangeHandler
+import org.apache.kafka.common.internals.FatalExitError
+
+import scala.concurrent.TimeoutException
+
+/**
+ * Listens to changes in the ZK feature node, via the ZK client. Whenever a 
change notification
+ * is received from ZK, the feature cache in FinalizedFeatureCache is 
asynchronously updated
+ * to the latest features read from ZK. The cache updates are serialized 
through a single
+ * notification processor thread.
+ *
+ * @param zkClient     the Zookeeper client
+ */
+class FinalizedFeatureChangeListener(zkClient: KafkaZkClient) extends Logging {
+
+  /**
+   * Helper class used to update the FinalizedFeatureCache.
+   *
+   * @param featureZkNodePath   the path to the ZK feature node to be read
+   * @param maybeNotifyOnce     an optional latch that can be used to propvide 
notification
+   *                            when an update operation is over
+   */
+  private class FeatureCacheUpdater(featureZkNodePath: String, 
maybeNotifyOnce: Option[CountDownLatch]) {
+
+    def this(featureZkNodePath: String) = this(featureZkNodePath, Option.empty)
+
+    /**
+     * Updates the feature cache in FinalizedFeatureCache with the latest 
features read from the
+     * ZK node in featureZkNodePath. If the cache update is not successful, 
then, a suitable
+     * exception is raised.
+     *
+     * NOTE: if a notifier was provided in the constructor, then, this method 
can be invoked
+     * only exactly once successfully.
+     */
+    def updateLatestOrThrow(): Unit = {
+      maybeNotifyOnce.foreach(notifier => {
+        if (notifier.getCount != 1) {
+          throw new IllegalStateException(
+            "Can not notify after updateLatestOrThrow was called more than 
once successfully.")
+        }
+      })
+
+      info(s"Reading feature ZK node at path: $featureZkNodePath")
+      val (mayBeFeatureZNodeBytes, version) = 
zkClient.getDataAndVersion(featureZkNodePath)
+      if (version == ZkVersion.UnknownVersion) {
+        info(s"Feature ZK node at path: $featureZkNodePath does not exist")
+        FinalizedFeatureCache.clear()
+      } else {
+        val featureZNode = FeatureZNode.decode(mayBeFeatureZNodeBytes.get)
+        if (featureZNode.status == FeatureZNodeStatus.Disabled) {
+          info(s"Feature ZK node at path: $featureZkNodePath is in disabled 
status")
+          FinalizedFeatureCache.clear()
+        } else if(featureZNode.status == FeatureZNodeStatus.Enabled) {
+          FinalizedFeatureCache.updateOrThrow(featureZNode.features, version)
+        } else {
+          throw new IllegalStateException(s"Unexpected FeatureZNodeStatus 
found in $featureZNode")
+        }
+      }
+
+      maybeNotifyOnce.foreach(notifier => notifier.countDown())
+    }
+
+    /**
+     * Waits until at least a single updateLatestOrThrow completes 
successfully.
+     * NOTE: The method returns immediately if an updateLatestOrThrow call has 
already completed
+     * successfully.
+     *
+     * @param waitTimeMs   the timeout for the wait operation
+     *
+     * @throws             - RuntimeException if the thread was interrupted 
during wait
+     *                     - TimeoutException if the wait can not be completed 
in waitTimeMs
+     *                       milli seconds
+     */
+    def awaitUpdateOrThrow(waitTimeMs: Long): Unit = {
+      maybeNotifyOnce.foreach(notifier => {
+        var success = false
+        try {
+          success = notifier.await(waitTimeMs, TimeUnit.MILLISECONDS)
+        } catch {
+          case e: InterruptedException =>
+            throw new RuntimeException(
+              "Unable to wait for FinalizedFeatureCache update to finish.", e)
+        }
+
+        if (!success) {
+          throw new TimeoutException(
+            s"Timed out after waiting for ${waitTimeMs}ms for FeatureCache to 
be updated.")
+        }
+      })
+    }
+  }
+
+  /**
+   * A shutdownable thread to process feature node change notifications that 
are populated into the
+   * queue. If any change notification can not be processed successfully 
(unless it is due to an
+   * interrupt), the thread treats it as a fatal event and triggers Broker 
exit.
+   *
+   * @param name   name of the thread
+   */
+  private class ChangeNotificationProcessorThread(name: String) extends 
ShutdownableThread(name = name) {
+    override def doWork(): Unit = {
+      try {
+        queue.take.updateLatestOrThrow()
+      } catch {
+        case e: InterruptedException => info(s"Interrupted", e)
+        case e: Exception => {
+          error("Failed to process feature ZK node change event. The broker 
will exit.", e)
+          throw new FatalExitError(1)
+        }
+      }
+    }
+  }
+
+  // Feature ZK node change handler.
+  object FeatureZNodeChangeHandler extends ZNodeChangeHandler {
+    override val path: String = FeatureZNode.path
+
+    private def processNotification(): Unit = {
+      queue.add(new FeatureCacheUpdater(path))
+    }
+
+    override def handleCreation(): Unit = {
+      info(s"Feature ZK node created at path: $path")
+      processNotification()
+    }
+
+    override def handleDataChange(): Unit = {
+      info(s"Feature ZK node updated at path: $path")
+      processNotification()
+    }
+
+    override def handleDeletion(): Unit = {
+      warn(s"Feature ZK node deleted at path: $path")
+      processNotification()

Review comment:
       I think even if this is an operational error, the cluster is at risk of 
violating the feature semantics previously enabled, which is different from an 
unknown feature version from the beginning. I feel we should just exit in fatal 
error for this case, but would open for discussion.

##########
File path: core/src/main/scala/kafka/zk/ZkData.scala
##########
@@ -744,6 +782,90 @@ object DelegationTokenInfoZNode {
   def decode(bytes: Array[Byte]): Option[TokenInformation] = 
DelegationTokenManager.fromBytes(bytes)
 }
 
+object FeatureZNodeStatus extends Enumeration {
+  val Disabled, Enabled = Value
+
+  def withNameOpt(value: Int): Option[Value] = {
+    values.find(_.id == value)
+  }
+}
+
+case class FeatureZNode(status: FeatureZNodeStatus.Value, features: 
Features[FinalizedVersionRange]) {
+}
+
+object FeatureZNode {
+  private val VersionKey = "version"
+  private val StatusKey = "status"
+  private val FeaturesKey = "features"
+
+  // Version0 contains 'version', 'status' and 'features' keys.
+  val Version0 = 0

Review comment:
       Could we name it V0 for simplicity?

##########
File path: core/src/main/scala/kafka/zk/KafkaZkClient.scala
##########
@@ -1567,6 +1567,36 @@ class KafkaZkClient private[zk] (zooKeeperClient: 
ZooKeeperClient, isSecure: Boo
     createRecursive(path, data = null, throwIfPathExists = false)
   }
 
+  // Visible for testing.
+  def createFeatureZNode(nodeContents: FeatureZNode): Unit = {

Review comment:
       If that's the case, I feel we could remove the testing only comment.

##########
File path: core/src/main/scala/kafka/zk/ZkData.scala
##########
@@ -744,6 +782,90 @@ object DelegationTokenInfoZNode {
   def decode(bytes: Array[Byte]): Option[TokenInformation] = 
DelegationTokenManager.fromBytes(bytes)
 }
 
+object FeatureZNodeStatus extends Enumeration {
+  val Disabled, Enabled = Value
+
+  def withNameOpt(value: Int): Option[Value] = {
+    values.find(_.id == value)
+  }
+}
+
+case class FeatureZNode(status: FeatureZNodeStatus.Value, features: 
Features[FinalizedVersionRange]) {
+}
+
+object FeatureZNode {
+  private val VersionKey = "version"
+  private val StatusKey = "status"
+  private val FeaturesKey = "features"
+
+  // Version0 contains 'version', 'status' and 'features' keys.
+  val Version0 = 0
+  val CurrentVersion = Version0
+
+  def path = "/feature"
+
+  def asJavaMap(scalaMap: Map[String, Map[String, Long]]): util.Map[String, 
util.Map[String, java.lang.Long]] = {
+    scalaMap
+      .view.mapValues(_.view.mapValues(scalaLong => 
java.lang.Long.valueOf(scalaLong)).toMap.asJava)
+      .toMap
+      .asJava
+  }
+
+  def encode(featureZNode: FeatureZNode): Array[Byte] = {
+    val jsonMap = collection.mutable.Map(
+      VersionKey -> CurrentVersion,
+      StatusKey -> featureZNode.status.id,
+      FeaturesKey -> featureZNode.features.serialize)
+    Json.encodeAsBytes(jsonMap.asJava)
+  }
+
+  def decode(jsonBytes: Array[Byte]): FeatureZNode = {
+    Json.tryParseBytes(jsonBytes) match {
+      case Right(js) =>
+        val featureInfo = js.asJsonObject
+        val version = featureInfo(VersionKey).to[Int]
+        if (version < Version0 || version > CurrentVersion) {
+          throw new KafkaException(s"Unsupported version: $version of feature 
information: " +
+            s"${new String(jsonBytes, UTF_8)}")
+        }
+
+        val featuresMap = featureInfo
+          .get(FeaturesKey)
+          .flatMap(_.to[Option[Map[String, Map[String, Long]]]])
+        if (featuresMap.isEmpty) {
+          throw new KafkaException("Features map can not be absent in: " +
+            s"${new String(jsonBytes, UTF_8)}")
+        }
+        val features = asJavaMap(featuresMap.get)
+
+        val statusInt = featureInfo
+          .get(StatusKey)
+          .flatMap(_.to[Option[Int]])
+        if (statusInt.isEmpty) {
+          throw new KafkaException("Status can not be absent in feature 
information: " +

Review comment:
       Is there a more dedicated exception code for deserialization error? I 
feel the KafkaException is a bit too general compared with IllegalArgument

##########
File path: 
core/src/test/scala/unit/kafka/server/FinalizedFeatureChangeListenerTest.scala
##########
@@ -0,0 +1,228 @@
+package kafka.server
+
+import kafka.zk.{FeatureZNode, FeatureZNodeStatus, ZkVersion, 
ZooKeeperTestHarness}
+import kafka.utils.{Exit, TestUtils}
+import org.apache.kafka.common.feature.{Features, FinalizedVersionRange, 
SupportedVersionRange}
+import org.apache.kafka.common.internals.FatalExitError
+import org.junit.Assert.{assertEquals, assertFalse, assertNotEquals, 
assertThrows, assertTrue}
+import org.junit.{Before, Test}
+
+import scala.concurrent.TimeoutException
+import scala.jdk.CollectionConverters._
+
+class FinalizedFeatureChangeListenerTest extends ZooKeeperTestHarness {

Review comment:
       Could we extract some common initialization logic for the tests to 
reduce duplication?

##########
File path: 
core/src/test/scala/unit/kafka/server/FinalizedFeatureChangeListenerTest.scala
##########
@@ -0,0 +1,228 @@
+package kafka.server
+
+import kafka.zk.{FeatureZNode, FeatureZNodeStatus, ZkVersion, 
ZooKeeperTestHarness}
+import kafka.utils.{Exit, TestUtils}
+import org.apache.kafka.common.feature.{Features, FinalizedVersionRange, 
SupportedVersionRange}
+import org.apache.kafka.common.internals.FatalExitError
+import org.junit.Assert.{assertEquals, assertFalse, assertNotEquals, 
assertThrows, assertTrue}
+import org.junit.{Before, Test}
+
+import scala.concurrent.TimeoutException
+import scala.jdk.CollectionConverters._
+
+class FinalizedFeatureChangeListenerTest extends ZooKeeperTestHarness {
+  @Before
+  override def setUp(): Unit = {
+    super.setUp()
+    FinalizedFeatureCache.clear()
+    SupportedFeatures.clear()
+  }
+
+  /**
+   * Tests that the listener can be initialized, and that it can listen to ZK 
notifications
+   * successfully from an "Enabled" FeatureZNode (the ZK data has no feature 
incompatibilities).
+   */
+  @Test
+  def testInitSuccessAndNotificationSuccess(): Unit = {
+    val supportedFeatures = Map[String, SupportedVersionRange](
+      "feature_1" -> new SupportedVersionRange(1, 4),
+      "feature_2" -> new SupportedVersionRange(1, 3))
+    
SupportedFeatures.update(Features.supportedFeatures(supportedFeatures.asJava))
+
+    val initialFinalizedFeaturesMap = Map[String, FinalizedVersionRange](
+      "feature_1" -> new FinalizedVersionRange(2, 3))
+    val initialFinalizedFeatures = 
Features.finalizedFeatures(initialFinalizedFeaturesMap.asJava)
+    zkClient.createFeatureZNode(FeatureZNode(FeatureZNodeStatus.Enabled, 
initialFinalizedFeatures))
+    val (mayBeFeatureZNodeBytes, initialVersion) = 
zkClient.getDataAndVersion(FeatureZNode.path)
+    assertNotEquals(initialVersion, ZkVersion.UnknownVersion)
+    assertFalse(mayBeFeatureZNodeBytes.isEmpty)
+
+    val listener = new FinalizedFeatureChangeListener(zkClient)
+    assertFalse(listener.isListenerInitiated)
+    assertTrue(FinalizedFeatureCache.empty)
+    listener.initOrThrow(15000)
+    assertTrue(listener.isListenerInitiated)
+    val mayBeNewCacheContent = FinalizedFeatureCache.get
+    assertFalse(mayBeNewCacheContent.isEmpty)
+    val newCacheContent = mayBeNewCacheContent.get
+    assertEquals(initialFinalizedFeatures, newCacheContent.features)
+    assertEquals(initialVersion, newCacheContent.epoch)
+
+    val updatedFinalizedFeaturesMap = Map[String, FinalizedVersionRange](
+      "feature_1" -> new FinalizedVersionRange(2, 4))
+    val updatedFinalizedFeatures = 
Features.finalizedFeatures(updatedFinalizedFeaturesMap.asJava)
+    zkClient.updateFeatureZNode(FeatureZNode(FeatureZNodeStatus.Enabled, 
updatedFinalizedFeatures))
+    val (mayBeFeatureZNodeNewBytes, updatedVersion) = 
zkClient.getDataAndVersion(FeatureZNode.path)
+    assertNotEquals(updatedVersion, ZkVersion.UnknownVersion)
+    assertFalse(mayBeFeatureZNodeNewBytes.isEmpty)
+    assertTrue(updatedVersion > initialVersion)
+    TestUtils.waitUntilTrue(() => {
+      
FinalizedFeatureCache.get.get.equals(FinalizedFeaturesAndEpoch(updatedFinalizedFeatures,
 updatedVersion))
+    }, "Timed out waiting for FinalizedFeatureCache to be updated with new 
features")
+    assertTrue(listener.isListenerInitiated)
+  }
+
+  /**
+   * Tests that the listener can be initialized, and that it can process 
FeatureZNode deletion
+   * successfully.
+   */
+  @Test
+  def testFeatureZNodeDeleteNotificationProcessing(): Unit = {
+    val supportedFeatures = Map[String, SupportedVersionRange](
+      "feature_1" -> new SupportedVersionRange(1, 4),
+      "feature_2" -> new SupportedVersionRange(1, 3))
+    
SupportedFeatures.update(Features.supportedFeatures(supportedFeatures.asJava))
+
+    val initialFinalizedFeaturesMap = Map[String, FinalizedVersionRange](
+      "feature_1" -> new FinalizedVersionRange(2, 3))
+    val initialFinalizedFeatures = 
Features.finalizedFeatures(initialFinalizedFeaturesMap.asJava)
+    zkClient.createFeatureZNode(FeatureZNode(FeatureZNodeStatus.Enabled, 
initialFinalizedFeatures))
+    val (mayBeFeatureZNodeBytes, initialVersion) = 
zkClient.getDataAndVersion(FeatureZNode.path)
+    assertNotEquals(initialVersion, ZkVersion.UnknownVersion)
+    assertFalse(mayBeFeatureZNodeBytes.isEmpty)
+
+    val listener = new FinalizedFeatureChangeListener(zkClient)
+    assertFalse(listener.isListenerInitiated)
+    assertTrue(FinalizedFeatureCache.empty)
+    listener.initOrThrow(15000)
+    assertTrue(listener.isListenerInitiated)
+    val mayBeNewCacheContent = FinalizedFeatureCache.get
+    assertFalse(mayBeNewCacheContent.isEmpty)
+    val newCacheContent = mayBeNewCacheContent.get
+    assertEquals(initialFinalizedFeatures, newCacheContent.features)
+    assertEquals(initialVersion, newCacheContent.epoch)
+
+    zkClient.deleteFeatureZNode()

Review comment:
       I'm a bit surprised, do we want to support feature znode deletion in 
long term?

##########
File path: core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
##########
@@ -776,6 +776,9 @@ class KafkaConfigTest {
         case KafkaConfig.KafkaMetricsReporterClassesProp => // ignore
         case KafkaConfig.KafkaMetricsPollingIntervalSecondsProp => //ignore
 
+        //Feature configuration

Review comment:
       nit: space




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to