abbccdda commented on a change in pull request #8680:
URL: https://github.com/apache/kafka/pull/8680#discussion_r426806321



##########
File path: clients/src/main/java/org/apache/kafka/common/feature/Features.java
##########
@@ -0,0 +1,143 @@
+package org.apache.kafka.common.feature;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.Objects;
+
+import static java.util.stream.Collectors.joining;
+
+/**
+ * Represents an immutable dictionary with key being feature name, and value 
being VersionRangeType.
+ * Also provides API to serialize/deserialize the features and their version 
ranges to/from a map.
+ *
+ * This class can be instantiated only using its factory functions, with the 
important ones being:
+ * Features.supportedFeatures(...) and Features.finalizedFeatures(...).
+ *
+ * @param <VersionRangeType> is the type of version range.
+ */
+public class Features<VersionRangeType extends VersionRange> {
+    private final Map<String, VersionRangeType> features;
+
+    /**
+     * Constructor is made private, as for readability it is preferred the 
caller uses one of the
+     * static factory functions for instantiation (see below).
+     *
+     * @param features   Map of feature name to type of VersionRange, as the 
backing data structure
+     *                   for the Features object.
+     */
+    private Features(Map<String, VersionRangeType> features) {
+        this.features = features;
+    }
+
+    /**
+     * @param features   Map of feature name to VersionRange, as the backing 
data structure
+     *                   for the Features object.
+     * @return           Returns a new Features object representing 
"supported" features.
+     */
+    public static Features<VersionRange> supportedFeatures(Map<String, 
VersionRange> features) {
+        return new Features<VersionRange>(features);
+    }
+
+    /**
+     * @param features   Map of feature name to VersionLevelRange, as the 
backing data structure
+     *                   for the Features object.
+     * @return           Returns a new Features object representing 
"finalized" features.
+     */
+    public static Features<VersionLevelRange> finalizedFeatures(Map<String, 
VersionLevelRange> features) {
+        return new Features<VersionLevelRange>(features);
+    }
+
+    public static Features<VersionLevelRange> emptyFinalizedFeatures() {

Review comment:
       Is this function only used in unit test?

##########
File path: clients/src/main/java/org/apache/kafka/common/feature/Features.java
##########
@@ -0,0 +1,143 @@
+package org.apache.kafka.common.feature;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.Objects;
+
+import static java.util.stream.Collectors.joining;
+
+/**
+ * Represents an immutable dictionary with key being feature name, and value 
being VersionRangeType.
+ * Also provides API to serialize/deserialize the features and their version 
ranges to/from a map.
+ *
+ * This class can be instantiated only using its factory functions, with the 
important ones being:
+ * Features.supportedFeatures(...) and Features.finalizedFeatures(...).
+ *
+ * @param <VersionRangeType> is the type of version range.
+ */
+public class Features<VersionRangeType extends VersionRange> {
+    private final Map<String, VersionRangeType> features;
+
+    /**
+     * Constructor is made private, as for readability it is preferred the 
caller uses one of the
+     * static factory functions for instantiation (see below).
+     *
+     * @param features   Map of feature name to type of VersionRange, as the 
backing data structure
+     *                   for the Features object.
+     */
+    private Features(Map<String, VersionRangeType> features) {
+        this.features = features;
+    }
+
+    /**
+     * @param features   Map of feature name to VersionRange, as the backing 
data structure
+     *                   for the Features object.
+     * @return           Returns a new Features object representing 
"supported" features.
+     */
+    public static Features<VersionRange> supportedFeatures(Map<String, 
VersionRange> features) {
+        return new Features<VersionRange>(features);

Review comment:
       Could be simplified as new Features<>

##########
File path: clients/src/main/java/org/apache/kafka/common/feature/Features.java
##########
@@ -0,0 +1,143 @@
+package org.apache.kafka.common.feature;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.Objects;
+
+import static java.util.stream.Collectors.joining;
+
+/**
+ * Represents an immutable dictionary with key being feature name, and value 
being VersionRangeType.

Review comment:
       nit: we could use {@link VersionRangeType} to reference to the classes.

##########
File path: clients/src/main/java/org/apache/kafka/common/feature/Features.java
##########
@@ -0,0 +1,143 @@
+package org.apache.kafka.common.feature;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.Objects;
+
+import static java.util.stream.Collectors.joining;
+
+/**
+ * Represents an immutable dictionary with key being feature name, and value 
being VersionRangeType.
+ * Also provides API to serialize/deserialize the features and their version 
ranges to/from a map.
+ *
+ * This class can be instantiated only using its factory functions, with the 
important ones being:
+ * Features.supportedFeatures(...) and Features.finalizedFeatures(...).
+ *
+ * @param <VersionRangeType> is the type of version range.
+ */
+public class Features<VersionRangeType extends VersionRange> {
+    private final Map<String, VersionRangeType> features;
+
+    /**
+     * Constructor is made private, as for readability it is preferred the 
caller uses one of the
+     * static factory functions for instantiation (see below).
+     *
+     * @param features   Map of feature name to type of VersionRange, as the 
backing data structure
+     *                   for the Features object.
+     */
+    private Features(Map<String, VersionRangeType> features) {
+        this.features = features;
+    }
+
+    /**
+     * @param features   Map of feature name to VersionRange, as the backing 
data structure
+     *                   for the Features object.
+     * @return           Returns a new Features object representing 
"supported" features.
+     */
+    public static Features<VersionRange> supportedFeatures(Map<String, 
VersionRange> features) {
+        return new Features<VersionRange>(features);
+    }
+
+    /**
+     * @param features   Map of feature name to VersionLevelRange, as the 
backing data structure
+     *                   for the Features object.
+     * @return           Returns a new Features object representing 
"finalized" features.
+     */
+    public static Features<VersionLevelRange> finalizedFeatures(Map<String, 
VersionLevelRange> features) {
+        return new Features<VersionLevelRange>(features);
+    }
+
+    public static Features<VersionLevelRange> emptyFinalizedFeatures() {
+        return new Features<>(new HashMap<>());
+    }
+
+    public static Features<VersionRange> emptySupportedFeatures() {
+        return new Features<>(new HashMap<>());
+    }
+
+
+    public Map<String, VersionRangeType> all() {
+        return features;
+    }
+
+    public boolean empty() {
+        return features.isEmpty();
+    }
+
+    public VersionRangeType get(String feature) {
+        return all().get(feature);

Review comment:
       nit: just a personal preference, but getting one less internal reference 
to a public function `all` makes the code usage check easier, like 
`features.get(feature)`. 

##########
File path: clients/src/main/java/org/apache/kafka/common/feature/Features.java
##########
@@ -0,0 +1,143 @@
+package org.apache.kafka.common.feature;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.Objects;
+
+import static java.util.stream.Collectors.joining;
+
+/**
+ * Represents an immutable dictionary with key being feature name, and value 
being VersionRangeType.
+ * Also provides API to serialize/deserialize the features and their version 
ranges to/from a map.
+ *
+ * This class can be instantiated only using its factory functions, with the 
important ones being:
+ * Features.supportedFeatures(...) and Features.finalizedFeatures(...).
+ *
+ * @param <VersionRangeType> is the type of version range.
+ */
+public class Features<VersionRangeType extends VersionRange> {
+    private final Map<String, VersionRangeType> features;
+
+    /**
+     * Constructor is made private, as for readability it is preferred the 
caller uses one of the
+     * static factory functions for instantiation (see below).
+     *
+     * @param features   Map of feature name to type of VersionRange, as the 
backing data structure
+     *                   for the Features object.
+     */
+    private Features(Map<String, VersionRangeType> features) {
+        this.features = features;

Review comment:
       We should ensure `features` is not null

##########
File path: 
clients/src/test/java/org/apache/kafka/common/feature/FeaturesTest.java
##########
@@ -0,0 +1,216 @@
+package org.apache.kafka.common.feature;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
+
+public class FeaturesTest {
+
+    @Test
+    public void testEmptyFeatures() {
+        Map<String, Map<String, Long>> emptyMap = new HashMap<>();
+
+        Features<VersionLevelRange> emptyFinalizedFeatures = 
Features.emptyFinalizedFeatures();
+        assertEquals(new HashMap<>(), emptyFinalizedFeatures.all());
+        assertEquals(emptyMap, emptyFinalizedFeatures.serialize());
+        assertEquals(emptyFinalizedFeatures, 
Features.deserializeFinalizedFeatures(emptyMap));
+
+        Features<VersionRange> emptySupportedFeatures = 
Features.emptySupportedFeatures();
+        assertEquals(new HashMap<>(), emptySupportedFeatures.all());
+        assertEquals(
+            new HashMap<String, HashMap<String, Long>>(),
+            emptySupportedFeatures.serialize());
+        assertEquals(emptySupportedFeatures, 
Features.deserializeSupportedFeatures(emptyMap));
+    }
+
+    @Test
+    public void testAllAPI() {

Review comment:
       s/AllAPI/GetAllFeatures

##########
File path: clients/src/main/java/org/apache/kafka/common/feature/Features.java
##########
@@ -0,0 +1,143 @@
+package org.apache.kafka.common.feature;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.Objects;
+
+import static java.util.stream.Collectors.joining;
+
+/**
+ * Represents an immutable dictionary with key being feature name, and value 
being VersionRangeType.
+ * Also provides API to serialize/deserialize the features and their version 
ranges to/from a map.
+ *
+ * This class can be instantiated only using its factory functions, with the 
important ones being:
+ * Features.supportedFeatures(...) and Features.finalizedFeatures(...).
+ *
+ * @param <VersionRangeType> is the type of version range.
+ */
+public class Features<VersionRangeType extends VersionRange> {
+    private final Map<String, VersionRangeType> features;
+
+    /**
+     * Constructor is made private, as for readability it is preferred the 
caller uses one of the
+     * static factory functions for instantiation (see below).
+     *
+     * @param features   Map of feature name to type of VersionRange, as the 
backing data structure
+     *                   for the Features object.
+     */
+    private Features(Map<String, VersionRangeType> features) {
+        this.features = features;
+    }
+
+    /**
+     * @param features   Map of feature name to VersionRange, as the backing 
data structure
+     *                   for the Features object.
+     * @return           Returns a new Features object representing 
"supported" features.
+     */
+    public static Features<VersionRange> supportedFeatures(Map<String, 
VersionRange> features) {
+        return new Features<VersionRange>(features);
+    }
+
+    /**
+     * @param features   Map of feature name to VersionLevelRange, as the 
backing data structure
+     *                   for the Features object.
+     * @return           Returns a new Features object representing 
"finalized" features.
+     */
+    public static Features<VersionLevelRange> finalizedFeatures(Map<String, 
VersionLevelRange> features) {
+        return new Features<VersionLevelRange>(features);
+    }
+
+    public static Features<VersionLevelRange> emptyFinalizedFeatures() {
+        return new Features<>(new HashMap<>());
+    }
+
+    public static Features<VersionRange> emptySupportedFeatures() {
+        return new Features<>(new HashMap<>());
+    }
+

Review comment:
       nit: extra line

##########
File path: core/src/main/scala/kafka/server/SupportedFeatures.scala
##########
@@ -0,0 +1,70 @@
+package kafka.server
+
+import kafka.utils.Logging
+import org.apache.kafka.common.feature.{Features, VersionRange, 
VersionLevelRange}
+import org.apache.kafka.common.feature.Features._
+
+import scala.jdk.CollectionConverters._
+
+/**
+ * A common object used in the Broker to define the latest features supported 
by the Broker.
+ * Also provides API to check for incompatibilities between the latest 
features supported by the
+ * Broker and cluster-wide finalized features.
+ */
+object SupportedFeatures extends Logging {

Review comment:
       nit: add a line

##########
File path: 
clients/src/test/java/org/apache/kafka/common/feature/VersionLevelRangeTest.java
##########
@@ -0,0 +1,162 @@
+package org.apache.kafka.common.feature;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class VersionLevelRangeTest {
+
+    @Test
+    public void testCreateFailDueToInvalidParams() {
+        // min and max can't be < 1.
+        assertThrows(
+            IllegalArgumentException.class,
+            () -> new VersionLevelRange(0, 0));
+        assertThrows(
+            IllegalArgumentException.class,
+            () -> new VersionLevelRange(-1, -1));
+        // min can't be < 1.
+        assertThrows(
+            IllegalArgumentException.class,
+            () -> new VersionLevelRange(0, 1));
+        assertThrows(
+            IllegalArgumentException.class,
+            () -> new VersionLevelRange(-1, 1));
+        // max can't be < 1.
+        assertThrows(
+            IllegalArgumentException.class,
+            () -> new VersionLevelRange(1, 0));
+        assertThrows(
+            IllegalArgumentException.class,
+            () -> new VersionLevelRange(1, -1));
+        // min can't be > max.
+        assertThrows(
+            IllegalArgumentException.class,
+            () -> new VersionLevelRange(2, 1));
+    }
+
+    @Test
+    public void testSerializeDeserialize() {
+        VersionLevelRange versionLevelRange = new VersionLevelRange(1, 2);
+        assertEquals(1, versionLevelRange.min());
+        assertEquals(2, versionLevelRange.max());
+
+        Map<String, Long> serialized = versionLevelRange.serialize();
+        assertEquals(
+            new HashMap<String, Long>() {
+                {
+                    put("min_version_level", versionLevelRange.min());
+                    put("max_version_level", versionLevelRange.max());
+                }
+            },
+            serialized
+        );
+
+        VersionLevelRange deserialized = 
VersionLevelRange.deserialize(serialized);
+        assertEquals(1, deserialized.min());
+        assertEquals(2, deserialized.max());
+        assertEquals(versionLevelRange, deserialized);
+    }
+
+    @Test
+    public void testDeserializationFailureTest() {
+        // min_version_level can't be < 1.
+        Map<String, Long> invalidWithBadMinVersion = new HashMap<String, 
Long>() {
+            {
+                put("min_version_level", 0L);
+                put("max_version_level", 1L);
+            }
+        };
+        assertThrows(
+            IllegalArgumentException.class,
+            () -> VersionLevelRange.deserialize(invalidWithBadMinVersion));
+
+        // max_version_level can't be < 1.
+        Map<String, Long> invalidWithBadMaxVersion = new HashMap<String, 
Long>() {
+            {
+                put("min_version_level", 1L);
+                put("max_version_level", 0L);
+            }
+        };
+        assertThrows(
+            IllegalArgumentException.class,
+            () -> VersionLevelRange.deserialize(invalidWithBadMaxVersion));
+
+        // min_version_level and max_version_level can't be < 1.
+        Map<String, Long> invalidWithBadMinMaxVersion = new HashMap<String, 
Long>() {
+            {
+                put("min_version_level", 0L);
+                put("max_version_level", 0L);
+            }
+        };
+        assertThrows(
+            IllegalArgumentException.class,
+            () -> VersionLevelRange.deserialize(invalidWithBadMinMaxVersion));
+
+        // min_version_level can't be > max_version_level.
+        Map<String, Long> invalidWithLowerMaxVersion = new HashMap<String, 
Long>() {
+            {
+                put("min_version_level", 2L);
+                put("max_version_level", 1L);
+            }
+        };
+        assertThrows(
+            IllegalArgumentException.class,
+            () -> VersionLevelRange.deserialize(invalidWithLowerMaxVersion));
+
+        // min_version_level key missing.
+        Map<String, Long> invalidWithMinKeyMissing = new HashMap<String, 
Long>() {
+            {
+                put("max_version_level", 1L);
+            }
+        };
+        assertThrows(
+            IllegalArgumentException.class,
+            () -> VersionLevelRange.deserialize(invalidWithMinKeyMissing));
+
+        // max_version_level key missing.
+        Map<String, Long> invalidWithMaxKeyMissing = new HashMap<String, 
Long>() {
+            {
+                put("min_version_level", 1L);
+            }
+        };
+        assertThrows(
+            IllegalArgumentException.class,
+            () -> VersionLevelRange.deserialize(invalidWithMaxKeyMissing));
+    }
+
+    @Test
+    public void testToString() {
+        assertEquals("VersionLevelRange[1, 1]", new VersionLevelRange(1, 
1).toString());
+        assertEquals("VersionLevelRange[1, 2]", new VersionLevelRange(1, 
2).toString());
+    }
+
+    @Test
+    public void testEquals() {
+        assertTrue(new VersionLevelRange(1, 1).equals(new VersionLevelRange(1, 
1)));
+        assertFalse(new VersionLevelRange(1, 1).equals(new 
VersionLevelRange(1, 2)));
+    }
+
+    @Test
+    public void testIsCompatibleWith() {
+        assertTrue(new VersionLevelRange(1, 1).isCompatibleWith(new 
VersionRange(1, 1)));
+        assertTrue(new VersionLevelRange(2, 3).isCompatibleWith(new 
VersionRange(1, 4)));
+        assertTrue(new VersionLevelRange(1, 4).isCompatibleWith(new 
VersionRange(1, 4)));
+
+        assertFalse(new VersionLevelRange(1, 4).isCompatibleWith(new 
VersionRange(2, 3)));
+        assertFalse(new VersionLevelRange(1, 4).isCompatibleWith(new 
VersionRange(2, 4)));
+        assertFalse(new VersionLevelRange(2, 4).isCompatibleWith(new 
VersionRange(2, 3)));
+    }
+
+    @Test
+    public void testGetters() {
+        assertEquals(1, new VersionLevelRange(1, 2).min());
+        assertEquals(2, new VersionLevelRange(1, 2).max());
+    }
+}

Review comment:
       nit: new line

##########
File path: clients/src/main/java/org/apache/kafka/common/feature/Features.java
##########
@@ -0,0 +1,143 @@
+package org.apache.kafka.common.feature;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.Objects;
+
+import static java.util.stream.Collectors.joining;
+
+/**
+ * Represents an immutable dictionary with key being feature name, and value 
being VersionRangeType.
+ * Also provides API to serialize/deserialize the features and their version 
ranges to/from a map.
+ *
+ * This class can be instantiated only using its factory functions, with the 
important ones being:
+ * Features.supportedFeatures(...) and Features.finalizedFeatures(...).
+ *
+ * @param <VersionRangeType> is the type of version range.
+ */
+public class Features<VersionRangeType extends VersionRange> {
+    private final Map<String, VersionRangeType> features;
+
+    /**
+     * Constructor is made private, as for readability it is preferred the 
caller uses one of the
+     * static factory functions for instantiation (see below).
+     *
+     * @param features   Map of feature name to type of VersionRange, as the 
backing data structure
+     *                   for the Features object.
+     */
+    private Features(Map<String, VersionRangeType> features) {
+        this.features = features;
+    }
+
+    /**
+     * @param features   Map of feature name to VersionRange, as the backing 
data structure
+     *                   for the Features object.
+     * @return           Returns a new Features object representing 
"supported" features.
+     */
+    public static Features<VersionRange> supportedFeatures(Map<String, 
VersionRange> features) {
+        return new Features<VersionRange>(features);
+    }
+
+    /**
+     * @param features   Map of feature name to VersionLevelRange, as the 
backing data structure
+     *                   for the Features object.
+     * @return           Returns a new Features object representing 
"finalized" features.
+     */
+    public static Features<VersionLevelRange> finalizedFeatures(Map<String, 
VersionLevelRange> features) {
+        return new Features<VersionLevelRange>(features);
+    }
+
+    public static Features<VersionLevelRange> emptyFinalizedFeatures() {
+        return new Features<>(new HashMap<>());
+    }
+
+    public static Features<VersionRange> emptySupportedFeatures() {
+        return new Features<>(new HashMap<>());
+    }
+
+
+    public Map<String, VersionRangeType> all() {
+        return features;
+    }
+
+    public boolean empty() {
+        return features.isEmpty();
+    }
+
+    public VersionRangeType get(String feature) {
+        return all().get(feature);
+    }
+
+    public String toString() {
+        return String.format(
+            "Features{%s}",
+            features
+                .entrySet()
+                .stream()
+                .map(entry -> String.format("(%s -> %s)", entry.getKey(), 
entry.getValue()))
+                .collect(joining(", "))
+        );
+    }
+
+    /**
+     * @return   Serializes the underlying features to a map, and returns the 
same.
+     *           The returned value can be deserialized using one of the 
deserialize* APIs.
+     */
+    public Map<String, Map<String, Long>> serialize() {
+        return features.entrySet().stream().collect(
+            Collectors.toMap(
+                Map.Entry::getKey,
+                entry -> entry.getValue().serialize()));
+    }
+
+    /**
+     * Deserializes a map to Features<VersionLevelRange>.
+     *
+     * @param serialized   the serialized representation of a 
Features<VersionLevelRange> object,
+     *                     generated using the serialize() API.
+     *
+     * @return             the deserialized Features<VersionLevelRange> object
+     */
+    public static Features<VersionLevelRange> deserializeFinalizedFeatures(
+        Map<String, Map<String, Long>> serialized) {
+        return finalizedFeatures(serialized.entrySet().stream().collect(
+            Collectors.toMap(
+                Map.Entry::getKey,
+                entry -> VersionLevelRange.deserialize(entry.getValue()))));
+    }
+
+    /**
+     * Deserializes a map to Features<VersionRange>.
+     *
+     * @param serialized   the serialized representation of a 
Features<VersionRange> object,
+     *                     generated using the serialize() API.
+     *
+     * @return             the deserialized Features<VersionRange> object
+     */
+    public static Features<VersionRange> deserializeSupportedFeatures(
+        Map<String, Map<String, Long>> serialized) {
+        return supportedFeatures(serialized.entrySet().stream().collect(
+            Collectors.toMap(
+                Map.Entry::getKey,
+                entry -> VersionRange.deserialize(entry.getValue()))));
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        if (this == other) {

Review comment:
       We should check `null` for other.

##########
File path: clients/src/main/java/org/apache/kafka/common/feature/Features.java
##########
@@ -0,0 +1,143 @@
+package org.apache.kafka.common.feature;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.Objects;
+
+import static java.util.stream.Collectors.joining;
+
+/**
+ * Represents an immutable dictionary with key being feature name, and value 
being VersionRangeType.
+ * Also provides API to serialize/deserialize the features and their version 
ranges to/from a map.
+ *
+ * This class can be instantiated only using its factory functions, with the 
important ones being:
+ * Features.supportedFeatures(...) and Features.finalizedFeatures(...).
+ *
+ * @param <VersionRangeType> is the type of version range.
+ */
+public class Features<VersionRangeType extends VersionRange> {
+    private final Map<String, VersionRangeType> features;
+
+    /**
+     * Constructor is made private, as for readability it is preferred the 
caller uses one of the
+     * static factory functions for instantiation (see below).
+     *
+     * @param features   Map of feature name to type of VersionRange, as the 
backing data structure
+     *                   for the Features object.
+     */
+    private Features(Map<String, VersionRangeType> features) {
+        this.features = features;
+    }
+
+    /**
+     * @param features   Map of feature name to VersionRange, as the backing 
data structure
+     *                   for the Features object.
+     * @return           Returns a new Features object representing 
"supported" features.
+     */
+    public static Features<VersionRange> supportedFeatures(Map<String, 
VersionRange> features) {
+        return new Features<VersionRange>(features);
+    }
+
+    /**
+     * @param features   Map of feature name to VersionLevelRange, as the 
backing data structure
+     *                   for the Features object.
+     * @return           Returns a new Features object representing 
"finalized" features.
+     */
+    public static Features<VersionLevelRange> finalizedFeatures(Map<String, 
VersionLevelRange> features) {
+        return new Features<VersionLevelRange>(features);
+    }
+
+    public static Features<VersionLevelRange> emptyFinalizedFeatures() {
+        return new Features<>(new HashMap<>());
+    }
+
+    public static Features<VersionRange> emptySupportedFeatures() {
+        return new Features<>(new HashMap<>());
+    }
+
+
+    public Map<String, VersionRangeType> all() {
+        return features;
+    }
+
+    public boolean empty() {
+        return features.isEmpty();
+    }
+
+    public VersionRangeType get(String feature) {
+        return all().get(feature);

Review comment:
       Also if we could potentially have a not-found feature, we should either 
fail with illegal state, or make the return type `Optional<VersionRangeType>`

##########
File path: clients/src/main/java/org/apache/kafka/common/feature/Features.java
##########
@@ -0,0 +1,143 @@
+package org.apache.kafka.common.feature;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.Objects;
+
+import static java.util.stream.Collectors.joining;
+
+/**
+ * Represents an immutable dictionary with key being feature name, and value 
being VersionRangeType.
+ * Also provides API to serialize/deserialize the features and their version 
ranges to/from a map.
+ *
+ * This class can be instantiated only using its factory functions, with the 
important ones being:
+ * Features.supportedFeatures(...) and Features.finalizedFeatures(...).
+ *
+ * @param <VersionRangeType> is the type of version range.
+ */
+public class Features<VersionRangeType extends VersionRange> {
+    private final Map<String, VersionRangeType> features;
+
+    /**
+     * Constructor is made private, as for readability it is preferred the 
caller uses one of the
+     * static factory functions for instantiation (see below).
+     *
+     * @param features   Map of feature name to type of VersionRange, as the 
backing data structure
+     *                   for the Features object.
+     */
+    private Features(Map<String, VersionRangeType> features) {
+        this.features = features;
+    }
+
+    /**
+     * @param features   Map of feature name to VersionRange, as the backing 
data structure
+     *                   for the Features object.
+     * @return           Returns a new Features object representing 
"supported" features.
+     */
+    public static Features<VersionRange> supportedFeatures(Map<String, 
VersionRange> features) {
+        return new Features<VersionRange>(features);
+    }
+
+    /**
+     * @param features   Map of feature name to VersionLevelRange, as the 
backing data structure
+     *                   for the Features object.
+     * @return           Returns a new Features object representing 
"finalized" features.
+     */
+    public static Features<VersionLevelRange> finalizedFeatures(Map<String, 
VersionLevelRange> features) {
+        return new Features<VersionLevelRange>(features);
+    }
+
+    public static Features<VersionLevelRange> emptyFinalizedFeatures() {
+        return new Features<>(new HashMap<>());
+    }
+
+    public static Features<VersionRange> emptySupportedFeatures() {
+        return new Features<>(new HashMap<>());
+    }
+
+
+    public Map<String, VersionRangeType> all() {
+        return features;
+    }
+
+    public boolean empty() {
+        return features.isEmpty();
+    }
+
+    public VersionRangeType get(String feature) {
+        return all().get(feature);
+    }
+
+    public String toString() {
+        return String.format(
+            "Features{%s}",
+            features
+                .entrySet()
+                .stream()
+                .map(entry -> String.format("(%s -> %s)", entry.getKey(), 
entry.getValue()))
+                .collect(joining(", "))
+        );
+    }
+
+    /**
+     * @return   Serializes the underlying features to a map, and returns the 
same.
+     *           The returned value can be deserialized using one of the 
deserialize* APIs.
+     */
+    public Map<String, Map<String, Long>> serialize() {
+        return features.entrySet().stream().collect(
+            Collectors.toMap(
+                Map.Entry::getKey,
+                entry -> entry.getValue().serialize()));
+    }
+
+    /**
+     * Deserializes a map to Features<VersionLevelRange>.

Review comment:
       s/Deserializes/Deserialize

##########
File path: core/src/main/scala/kafka/cluster/Broker.scala
##########
@@ -34,14 +36,19 @@ object Broker {
                                          brokerId: Int,
                                          endpoints: util.List[Endpoint],
                                          interBrokerEndpoint: Endpoint) 
extends AuthorizerServerInfo
+
+  def apply(id: Int, endPoints: Seq[EndPoint], rack: Option[String]): Broker = 
{

Review comment:
       This is because the write path has not been implemented?

##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
##########
@@ -113,14 +141,26 @@ public static ApiVersionsResponse fromStruct(Struct 
struct, short version) {
         }
     }
 
-    public static ApiVersionsResponse apiVersionsResponse(int throttleTimeMs, 
byte maxMagic) {
+    public static ApiVersionsResponse apiVersionsResponse(
+        int throttleTimeMs,
+        byte maxMagic,
+        Features<VersionRange> latestSupportedFeatures,
+        Optional<Features<VersionLevelRange>> finalizedFeatures,
+        Optional<Long> finalizedFeaturesEpoch) {
         if (maxMagic == RecordBatch.CURRENT_MAGIC_VALUE && throttleTimeMs == 
DEFAULT_THROTTLE_TIME) {
             return DEFAULT_API_VERSIONS_RESPONSE;
         }
-        return createApiVersionsResponse(throttleTimeMs, maxMagic);
+        return createApiVersionsResponse(
+            throttleTimeMs, maxMagic, latestSupportedFeatures, 
finalizedFeatures, finalizedFeaturesEpoch);
     }
 
-    public static ApiVersionsResponse createApiVersionsResponse(int 
throttleTimeMs, final byte minMagic) {
+    public static ApiVersionsResponse createApiVersionsResponse(

Review comment:
       Looks like we have some gaps for unit testing `ApiVersionsResponse`. 
Could we add unit tests for this class, since the logic 
`createApiVersionsResponse` becomes non-trivial now?

##########
File path: clients/src/main/java/org/apache/kafka/common/feature/Features.java
##########
@@ -0,0 +1,143 @@
+package org.apache.kafka.common.feature;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.Objects;
+
+import static java.util.stream.Collectors.joining;
+
+/**
+ * Represents an immutable dictionary with key being feature name, and value 
being VersionRangeType.
+ * Also provides API to serialize/deserialize the features and their version 
ranges to/from a map.
+ *
+ * This class can be instantiated only using its factory functions, with the 
important ones being:
+ * Features.supportedFeatures(...) and Features.finalizedFeatures(...).
+ *
+ * @param <VersionRangeType> is the type of version range.
+ */
+public class Features<VersionRangeType extends VersionRange> {
+    private final Map<String, VersionRangeType> features;
+
+    /**
+     * Constructor is made private, as for readability it is preferred the 
caller uses one of the
+     * static factory functions for instantiation (see below).
+     *
+     * @param features   Map of feature name to type of VersionRange, as the 
backing data structure
+     *                   for the Features object.
+     */
+    private Features(Map<String, VersionRangeType> features) {
+        this.features = features;
+    }
+
+    /**
+     * @param features   Map of feature name to VersionRange, as the backing 
data structure
+     *                   for the Features object.
+     * @return           Returns a new Features object representing 
"supported" features.
+     */
+    public static Features<VersionRange> supportedFeatures(Map<String, 
VersionRange> features) {
+        return new Features<VersionRange>(features);
+    }
+
+    /**
+     * @param features   Map of feature name to VersionLevelRange, as the 
backing data structure
+     *                   for the Features object.
+     * @return           Returns a new Features object representing 
"finalized" features.
+     */
+    public static Features<VersionLevelRange> finalizedFeatures(Map<String, 
VersionLevelRange> features) {
+        return new Features<VersionLevelRange>(features);

Review comment:
       Same here

##########
File path: core/src/main/scala/kafka/server/FinalizedFeatureCache.scala
##########
@@ -0,0 +1,93 @@
+package kafka.server
+
+import kafka.utils.Logging
+import org.apache.kafka.common.feature.{Features, VersionLevelRange}
+
+// Raised whenever there was an error in updating the FinalizedFeatureCache 
with features.
+class FeatureCacheUpdateException(message: String) extends 
RuntimeException(message) {
+}
+
+// Helper class that represents finalized features along with an epoch value.
+case class FinalizedFeaturesAndEpoch(features: Features[VersionLevelRange], 
epoch: Int) {
+
+  def isValid(newEpoch: Int): Boolean = {
+    newEpoch >= epoch
+  }
+
+  override def toString(): String = {
+    "FinalizedFeaturesAndEpoch(features=%s, epoch=%s)".format(features, epoch)
+  }
+}
+
+/**
+ * A mutable cache containing the latest finalized features and epoch. This 
cache is populated by a
+ * FinalizedFeatureChangeListener.
+ *
+ * Currently the main reader of this cache is the read path that serves an 
ApiVersionsRequest
+ * returning the features information in the response. In the future, as the 
feature versioning
+ * system in KIP-584 is used more widely, this cache could be read by other 
read paths trying to
+ * learn the finalized feature information.
+ */
+object FinalizedFeatureCache extends Logging {
+  @volatile private var featuresAndEpoch: Option[FinalizedFeaturesAndEpoch] = 
Option.empty
+
+  /**
+   * @return   the latest known FinalizedFeaturesAndEpoch. If the returned 
value is empty, it means
+   *           no FinalizedFeaturesAndEpoch exists in the cache at the time 
when this
+   *           method is invoked. This result could change in the future 
whenever the
+   *           updateOrThrow method is invoked.
+   */
+  def get: Option[FinalizedFeaturesAndEpoch] = {
+    featuresAndEpoch
+  }
+
+  def empty: Boolean = {
+    featuresAndEpoch.isEmpty
+  }
+
+  /**
+   * Clears all existing finalized features and epoch from the cache.
+   */
+  def clear(): Unit = {
+    featuresAndEpoch = Option.empty
+    info("Cleared cache")
+  }
+
+  /**
+   * Updates the cache to the latestFeatures, and updates the existing epoch 
to latestEpoch.
+   * Raises an exception when the operation is not successful.
+   *
+   * @param latestFeatures   the latest finalized features to be set in the 
cache
+   * @param latestEpoch      the latest epoch value to be set in the cache
+   *
+   * @throws                 FeatureCacheUpdateException if the cache update 
operation fails
+   *                         due to invalid parameters or incompatibilities 
with the broker's
+   *                         supported features. In such a case, the existing 
cache contents are
+   *                         not modified.
+   */
+  def updateOrThrow(latestFeatures: Features[VersionLevelRange], latestEpoch: 
Int): Unit = {
+    updateOrThrow(FinalizedFeaturesAndEpoch(latestFeatures, latestEpoch))
+  }
+
+  private def updateOrThrow(latest: FinalizedFeaturesAndEpoch): Unit = {
+    val existingStr = featuresAndEpoch.map(existing => 
existing.toString).getOrElse("<empty>")
+    if (!featuresAndEpoch.isEmpty && featuresAndEpoch.get.epoch > 
latest.epoch) {

Review comment:
       s/ !featuresAndEpoch.isEmpty / featuresAndEpoch.isDefined

##########
File path: clients/src/main/java/org/apache/kafka/common/feature/Features.java
##########
@@ -0,0 +1,143 @@
+package org.apache.kafka.common.feature;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.Objects;
+
+import static java.util.stream.Collectors.joining;
+
+/**
+ * Represents an immutable dictionary with key being feature name, and value 
being VersionRangeType.
+ * Also provides API to serialize/deserialize the features and their version 
ranges to/from a map.
+ *
+ * This class can be instantiated only using its factory functions, with the 
important ones being:
+ * Features.supportedFeatures(...) and Features.finalizedFeatures(...).
+ *
+ * @param <VersionRangeType> is the type of version range.
+ */
+public class Features<VersionRangeType extends VersionRange> {
+    private final Map<String, VersionRangeType> features;
+
+    /**
+     * Constructor is made private, as for readability it is preferred the 
caller uses one of the
+     * static factory functions for instantiation (see below).
+     *
+     * @param features   Map of feature name to type of VersionRange, as the 
backing data structure
+     *                   for the Features object.
+     */
+    private Features(Map<String, VersionRangeType> features) {
+        this.features = features;
+    }
+
+    /**
+     * @param features   Map of feature name to VersionRange, as the backing 
data structure
+     *                   for the Features object.
+     * @return           Returns a new Features object representing 
"supported" features.
+     */
+    public static Features<VersionRange> supportedFeatures(Map<String, 
VersionRange> features) {
+        return new Features<VersionRange>(features);
+    }
+
+    /**
+     * @param features   Map of feature name to VersionLevelRange, as the 
backing data structure
+     *                   for the Features object.
+     * @return           Returns a new Features object representing 
"finalized" features.
+     */
+    public static Features<VersionLevelRange> finalizedFeatures(Map<String, 
VersionLevelRange> features) {
+        return new Features<VersionLevelRange>(features);
+    }
+
+    public static Features<VersionLevelRange> emptyFinalizedFeatures() {
+        return new Features<>(new HashMap<>());
+    }
+
+    public static Features<VersionRange> emptySupportedFeatures() {
+        return new Features<>(new HashMap<>());
+    }
+
+
+    public Map<String, VersionRangeType> all() {
+        return features;
+    }
+
+    public boolean empty() {
+        return features.isEmpty();
+    }
+
+    public VersionRangeType get(String feature) {
+        return all().get(feature);
+    }
+
+    public String toString() {
+        return String.format(
+            "Features{%s}",
+            features
+                .entrySet()
+                .stream()
+                .map(entry -> String.format("(%s -> %s)", entry.getKey(), 
entry.getValue()))
+                .collect(joining(", "))
+        );
+    }
+
+    /**
+     * @return   Serializes the underlying features to a map, and returns the 
same.

Review comment:
       Maybe rephrase as `a map with the underlying features serialized`

##########
File path: clients/src/main/resources/common/message/ApiVersionsResponse.json
##########
@@ -42,6 +42,33 @@
         "about": "The maximum supported version, inclusive." }
     ]},
     { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", 
"ignorable": true,
-      "about": "The duration in milliseconds for which the request was 
throttled due to a quota violation, or zero if the request did not violate any 
quota." }
+      "about": "The duration in milliseconds for which the request was 
throttled due to a quota violation, or zero if the request did not violate any 
quota." },

Review comment:
       I think we need to bump the schema version to 4? Same with 
`ApiVersionsRequest.json`

##########
File path: 
clients/src/test/java/org/apache/kafka/common/feature/FeaturesTest.java
##########
@@ -0,0 +1,216 @@
+package org.apache.kafka.common.feature;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
+
+public class FeaturesTest {
+
+    @Test
+    public void testEmptyFeatures() {
+        Map<String, Map<String, Long>> emptyMap = new HashMap<>();
+
+        Features<VersionLevelRange> emptyFinalizedFeatures = 
Features.emptyFinalizedFeatures();
+        assertEquals(new HashMap<>(), emptyFinalizedFeatures.all());
+        assertEquals(emptyMap, emptyFinalizedFeatures.serialize());
+        assertEquals(emptyFinalizedFeatures, 
Features.deserializeFinalizedFeatures(emptyMap));
+
+        Features<VersionRange> emptySupportedFeatures = 
Features.emptySupportedFeatures();
+        assertEquals(new HashMap<>(), emptySupportedFeatures.all());
+        assertEquals(
+            new HashMap<String, HashMap<String, Long>>(),
+            emptySupportedFeatures.serialize());
+        assertEquals(emptySupportedFeatures, 
Features.deserializeSupportedFeatures(emptyMap));
+    }
+
+    @Test
+    public void testAllAPI() {
+        VersionRange v1 = new VersionRange(1, 2);
+        VersionRange v2 = new VersionRange(3, 4);
+        Map<String, VersionRange> allFeatures = new HashMap<String, 
VersionRange>() {
+            {
+                put("feature_1", v1);
+                put("feature_2", v2);
+            }
+        };
+        Features<VersionRange> features = 
Features.supportedFeatures(allFeatures);
+        assertEquals(allFeatures, features.all());
+    }
+
+    @Test
+    public void testGetAPI() {
+        VersionRange v1 = new VersionRange(1, 2);
+        VersionRange v2 = new VersionRange(3, 4);
+        Map<String, VersionRange> allFeatures = new HashMap<String, 
VersionRange>() {
+            {
+                put("feature_1", v1);
+                put("feature_2", v2);
+            }
+        };
+        Features<VersionRange> features = 
Features.supportedFeatures(allFeatures);
+        assertEquals(v1, features.get("feature_1"));
+        assertEquals(v2, features.get("feature_2"));
+        assertNull(features.get("nonexistent_feature"));
+    }
+
+    @Test
+    public void testSerializeDeserializeSupportedFeatures() {
+        VersionRange v1 = new VersionRange(1, 2);
+        VersionRange v2 = new VersionRange(3, 4);
+        Map<String, VersionRange> allFeatures = new HashMap<String, 
VersionRange>() {

Review comment:
       Same here

##########
File path: 
clients/src/test/java/org/apache/kafka/common/feature/FeaturesTest.java
##########
@@ -0,0 +1,216 @@
+package org.apache.kafka.common.feature;

Review comment:
       We need the apache license title

##########
File path: core/src/main/scala/kafka/cluster/Broker.scala
##########
@@ -34,14 +36,19 @@ object Broker {
                                          brokerId: Int,
                                          endpoints: util.List[Endpoint],
                                          interBrokerEndpoint: Endpoint) 
extends AuthorizerServerInfo
+
+  def apply(id: Int, endPoints: Seq[EndPoint], rack: Option[String]): Broker = 
{
+    new Broker(id, endPoints, rack, emptySupportedFeatures)
+  }
 }
 
 /**
  * A Kafka broker.
- * A broker has an id, a collection of end-points, an optional rack and a 
listener to security protocol map.
+ * A broker has an id, a collection of end-points, an optional rack and a 
listener to security protocol map,

Review comment:
       nit: might make sense to build meta comment for parameters:
   ```
   /**
     * 
     * @param id
     * @param endPoints
     * @param rack
     * @param features
     */
   ```

##########
File path: 
clients/src/test/java/org/apache/kafka/common/feature/FeaturesTest.java
##########
@@ -0,0 +1,216 @@
+package org.apache.kafka.common.feature;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
+
+public class FeaturesTest {
+
+    @Test
+    public void testEmptyFeatures() {
+        Map<String, Map<String, Long>> emptyMap = new HashMap<>();
+
+        Features<VersionLevelRange> emptyFinalizedFeatures = 
Features.emptyFinalizedFeatures();
+        assertEquals(new HashMap<>(), emptyFinalizedFeatures.all());
+        assertEquals(emptyMap, emptyFinalizedFeatures.serialize());
+        assertEquals(emptyFinalizedFeatures, 
Features.deserializeFinalizedFeatures(emptyMap));
+
+        Features<VersionRange> emptySupportedFeatures = 
Features.emptySupportedFeatures();
+        assertEquals(new HashMap<>(), emptySupportedFeatures.all());
+        assertEquals(
+            new HashMap<String, HashMap<String, Long>>(),
+            emptySupportedFeatures.serialize());
+        assertEquals(emptySupportedFeatures, 
Features.deserializeSupportedFeatures(emptyMap));
+    }
+
+    @Test
+    public void testAllAPI() {
+        VersionRange v1 = new VersionRange(1, 2);
+        VersionRange v2 = new VersionRange(3, 4);
+        Map<String, VersionRange> allFeatures = new HashMap<String, 
VersionRange>() {
+            {
+                put("feature_1", v1);
+                put("feature_2", v2);
+            }
+        };
+        Features<VersionRange> features = 
Features.supportedFeatures(allFeatures);
+        assertEquals(allFeatures, features.all());
+    }
+
+    @Test
+    public void testGetAPI() {
+        VersionRange v1 = new VersionRange(1, 2);
+        VersionRange v2 = new VersionRange(3, 4);
+        Map<String, VersionRange> allFeatures = new HashMap<String, 
VersionRange>() {

Review comment:
       We could use `org.apache.kafka.common.utils.Utils#mkMap` here




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to