abbccdda commented on a change in pull request #8680: URL: https://github.com/apache/kafka/pull/8680#discussion_r426884892
########## File path: clients/src/main/java/org/apache/kafka/common/feature/VersionLevelRange.java ########## @@ -0,0 +1,39 @@ +package org.apache.kafka.common.feature; + +import java.util.Map; + +/** + * A specialization of VersionRange representing a range of version levels. The main specialization + * is that the class uses different serialization keys for min/max attributes. + * + * NOTE: This is the backing class used to define the min/max version levels for finalized features. + */ +public class VersionLevelRange extends VersionRange { Review comment: In terms of naming, do you think `FinalizedVersionRange` is more explicit? Also when I look closer at the class hierarchy, I feel the sharing point between finalized version range and supported version range should be extracted to avoid weird inheritance. What I'm proposing is to have `VersionRange` as a super class with two subclasses: `SupportedVersionRange` and `FinalizedVersionRange`, and make `minKeyLabel` and `maxKeyLabel` abstract functions, WDYT? ########## File path: clients/src/test/java/org/apache/kafka/common/feature/VersionRangeTest.java ########## @@ -0,0 +1,150 @@ +package org.apache.kafka.common.feature; + +import java.util.HashMap; +import java.util.Map; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; + +public class VersionRangeTest { + @Test + public void testFailDueToInvalidParams() { + // min and max can't be < 1. + assertThrows( + IllegalArgumentException.class, + () -> new VersionRange(0, 0)); + assertThrows( + IllegalArgumentException.class, + () -> new VersionRange(-1, -1)); + // min can't be < 1. + assertThrows( + IllegalArgumentException.class, + () -> new VersionRange(0, 1)); + assertThrows( + IllegalArgumentException.class, + () -> new VersionRange(-1, 1)); + // max can't be < 1. + assertThrows( + IllegalArgumentException.class, + () -> new VersionRange(1, 0)); + assertThrows( + IllegalArgumentException.class, + () -> new VersionRange(1, -1)); + // min can't be > max. + assertThrows( + IllegalArgumentException.class, + () -> new VersionRange(2, 1)); + } + + @Test + public void testSerializeDeserializeTest() { + VersionRange versionRange = new VersionRange(1, 2); + assertEquals(1, versionRange.min()); + assertEquals(2, versionRange.max()); + + Map<String, Long> serialized = versionRange.serialize(); + assertEquals( + new HashMap<String, Long>() { + { + put("min_version", versionRange.min()); + put("max_version", versionRange.max()); + } + }, + serialized + ); + + VersionRange deserialized = VersionRange.deserialize(serialized); + assertEquals(1, deserialized.min()); + assertEquals(2, deserialized.max()); + assertEquals(versionRange, deserialized); + } + + @Test + public void testDeserializationFailureTest() { + // min_version can't be < 1. + Map<String, Long> invalidWithBadMinVersion = new HashMap<String, Long>() { + { + put("min_version", 0L); + put("max_version", 1L); + } + }; + assertThrows( + IllegalArgumentException.class, + () -> VersionRange.deserialize(invalidWithBadMinVersion)); + + // max_version can't be < 1. + Map<String, Long> invalidWithBadMaxVersion = new HashMap<String, Long>() { + { + put("min_version", 1L); + put("max_version", 0L); + } + }; + assertThrows( + IllegalArgumentException.class, + () -> VersionRange.deserialize(invalidWithBadMaxVersion)); + + // min_version and max_version can't be < 1. + Map<String, Long> invalidWithBadMinMaxVersion = new HashMap<String, Long>() { + { + put("min_version", 0L); + put("max_version", 0L); + } + }; + assertThrows( + IllegalArgumentException.class, + () -> VersionRange.deserialize(invalidWithBadMinMaxVersion)); + + // min_version can't be > max_version. + Map<String, Long> invalidWithLowerMaxVersion = new HashMap<String, Long>() { + { + put("min_version", 2L); + put("max_version", 1L); + } + }; + assertThrows( + IllegalArgumentException.class, + () -> VersionRange.deserialize(invalidWithLowerMaxVersion)); + + // min_version key missing. + Map<String, Long> invalidWithMinKeyMissing = new HashMap<String, Long>() { + { + put("max_version", 1L); + } + }; + assertThrows( + IllegalArgumentException.class, + () -> VersionRange.deserialize(invalidWithMinKeyMissing)); + + // max_version key missing. + Map<String, Long> invalidWithMaxKeyMissing = new HashMap<String, Long>() { + { + put("min_version", 1L); + } + }; + assertThrows( + IllegalArgumentException.class, + () -> VersionRange.deserialize(invalidWithMaxKeyMissing)); + } + + @Test + public void testToString() { + assertEquals("VersionRange[1, 1]", new VersionRange(1, 1).toString()); + assertEquals("VersionRange[1, 2]", new VersionRange(1, 2).toString()); + } + + @Test + public void testEquals() { + assertTrue(new VersionRange(1, 1).equals(new VersionRange(1, 1))); + assertFalse(new VersionRange(1, 1).equals(new VersionRange(1, 2))); + } + + @Test + public void testGetters() { Review comment: nit: testMinMax, and we could reuse the same `new VersionRange(1, 2)` by only creating it once. ########## File path: clients/src/main/java/org/apache/kafka/common/feature/VersionRange.java ########## @@ -0,0 +1,104 @@ +package org.apache.kafka.common.feature; + +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +/** + * Represents an immutable basic version range using 2 attributes: min and max of type long. + * The min and max attributes are expected to be >= 1, and with max >= min. + * + * The class also provides API to serialize/deserialize the version range to/from a map. + * The class allows for configurable labels for the min/max attributes, which can be specialized by + * sub-classes (if needed). + * + * NOTE: This is the backing class used to define the min/max versions for supported features. + */ +public class VersionRange { + // Label for the min version key, that's used only for serialization/deserialization purposes. + private static final String MIN_VERSION_KEY_LABEL = "min_version"; + + // Label for the max version key, that's used only for serialization/deserialization purposes. + private static final String MAX_VERSION_KEY_LABEL = "max_version"; + + private final String minKeyLabel; + + private final long minValue; + + private final String maxKeyLabel; + + private final long maxValue; + + protected VersionRange(String minKey, long minValue, String maxKeyLabel, long maxValue) { + if (minValue < 1 || maxValue < 1 || maxValue < minValue) { + throw new IllegalArgumentException( + String.format( + "Expected minValue > 1, maxValue > 1 and maxValue >= minValue, but received" + + " minValue: %d, maxValue: %d", minValue, maxValue)); + } + this.minKeyLabel = minKey; + this.minValue = minValue; + this.maxKeyLabel = maxKeyLabel; + this.maxValue = maxValue; + } + + public VersionRange(long minVersion, long maxVersion) { + this(MIN_VERSION_KEY_LABEL, minVersion, MAX_VERSION_KEY_LABEL, maxVersion); + } + + public long min() { + return minValue; + } + + public long max() { + return maxValue; + } + + public String toString() { + return String.format("%s[%d, %d]", this.getClass().getSimpleName(), min(), max()); + } + + public Map<String, Long> serialize() { + return new HashMap<String, Long>() { + { + put(minKeyLabel, min()); + put(maxKeyLabel, max()); + } + }; + } + + public static VersionRange deserialize(Map<String, Long> serialized) { + return new VersionRange( + valueOrThrow(MIN_VERSION_KEY_LABEL, serialized), + valueOrThrow(MAX_VERSION_KEY_LABEL, serialized)); + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + if (!(other instanceof VersionRange)) { Review comment: Need to check null ########## File path: core/src/main/scala/kafka/server/FinalizedFeatureCache.scala ########## @@ -0,0 +1,93 @@ +package kafka.server + +import kafka.utils.Logging +import org.apache.kafka.common.feature.{Features, VersionLevelRange} + +// Raised whenever there was an error in updating the FinalizedFeatureCache with features. +class FeatureCacheUpdateException(message: String) extends RuntimeException(message) { +} + +// Helper class that represents finalized features along with an epoch value. +case class FinalizedFeaturesAndEpoch(features: Features[VersionLevelRange], epoch: Int) { + + def isValid(newEpoch: Int): Boolean = { + newEpoch >= epoch + } + + override def toString(): String = { + "FinalizedFeaturesAndEpoch(features=%s, epoch=%s)".format(features, epoch) + } +} + +/** + * A mutable cache containing the latest finalized features and epoch. This cache is populated by a + * FinalizedFeatureChangeListener. Review comment: Could we add a reference to the class? ########## File path: core/src/main/scala/kafka/zk/ZkData.scala ########## @@ -81,17 +83,27 @@ object BrokerIdsZNode { object BrokerInfo { /** - * Create a broker info with v4 json format (which includes multiple endpoints and rack) if - * the apiVersion is 0.10.0.X or above. Register the broker with v2 json format otherwise. + * - Create a broker info with v5 json format if the apiVersion is 2.6.x or above. + * - Create a broker info with v4 json format (which includes multiple endpoints and rack) if + * the apiVersion is 0.10.0.X or above but lesser than 2.6.x. + * - Register the broker with v2 json format otherwise. * * Due to KAFKA-3100, 0.9.0.0 broker and old clients will break if JSON version is above 2. * - * We include v2 to make it possible for the broker to migrate from 0.9.0.0 to 0.10.0.X or above without having to - * upgrade to 0.9.0.1 first (clients have to be upgraded to 0.9.0.1 in any case). + * We include v2 to make it possible for the broker to migrate from 0.9.0.0 to 0.10.0.X or above + * without having to upgrade to 0.9.0.1 first (clients have to be upgraded to 0.9.0.1 in + * any case). */ def apply(broker: Broker, apiVersion: ApiVersion, jmxPort: Int): BrokerInfo = { - // see method documentation for the reason why we do this - val version = if (apiVersion >= KAFKA_0_10_0_IV1) 4 else 2 + val version = { Review comment: I don't think we need a nested if-else: ``` val version = { if (apiVersion >= KAFKA_2_6_IV1) 5 else if (apiVersion >= KAFKA_0_10_0_IV1) 4 else 2 } ``` ########## File path: core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala ########## @@ -149,6 +153,53 @@ class BrokerEndPointTest { assertEquals(None, broker.rack) } + @Test + def testFromJsonV5(): Unit = { + val json = """{ + "version":5, + "host":"localhost", + "port":9092, + "jmx_port":9999, + "timestamp":"2233345666", + "endpoints":["CLIENT://host1:9092", "REPLICATION://host1:9093"], + "listener_security_protocol_map":{"CLIENT":"SSL", "REPLICATION":"PLAINTEXT"}, + "rack":"dc1", + "features": {"feature1": {"min_version": 1, "max_version": 2}, "feature2": {"min_version": 2, "max_version": 4}} + }""" + val broker = parseBrokerJson(1, json) + assertEquals(1, broker.id) + val brokerEndPoint = broker.brokerEndPoint(new ListenerName("CLIENT")) + assertEquals("host1", brokerEndPoint.host) + assertEquals(9092, brokerEndPoint.port) + assertEquals(Some("dc1"), broker.rack) + assertEquals(Features.supportedFeatures( + Map[String, VersionRange]( + "feature1" -> new VersionRange(1, 2), + "feature2" -> new VersionRange(2, 4)).asJava), + broker.features) + } + + @Test + def testFromJsonV4WithNoFeatures(): Unit = { Review comment: nit: This test could move closer to testFromJsonV4WithNoRack ########## File path: core/src/main/scala/kafka/server/FinalizedFeatureCache.scala ########## @@ -0,0 +1,93 @@ +package kafka.server + +import kafka.utils.Logging +import org.apache.kafka.common.feature.{Features, VersionLevelRange} + +// Raised whenever there was an error in updating the FinalizedFeatureCache with features. +class FeatureCacheUpdateException(message: String) extends RuntimeException(message) { +} + +// Helper class that represents finalized features along with an epoch value. +case class FinalizedFeaturesAndEpoch(features: Features[VersionLevelRange], epoch: Int) { + + def isValid(newEpoch: Int): Boolean = { + newEpoch >= epoch + } + + override def toString(): String = { + "FinalizedFeaturesAndEpoch(features=%s, epoch=%s)".format(features, epoch) + } +} + +/** + * A mutable cache containing the latest finalized features and epoch. This cache is populated by a + * FinalizedFeatureChangeListener. + * + * Currently the main reader of this cache is the read path that serves an ApiVersionsRequest + * returning the features information in the response. In the future, as the feature versioning + * system in KIP-584 is used more widely, this cache could be read by other read paths trying to + * learn the finalized feature information. + */ +object FinalizedFeatureCache extends Logging { + @volatile private var featuresAndEpoch: Option[FinalizedFeaturesAndEpoch] = Option.empty + + /** + * @return the latest known FinalizedFeaturesAndEpoch. If the returned value is empty, it means + * no FinalizedFeaturesAndEpoch exists in the cache at the time when this + * method is invoked. This result could change in the future whenever the + * updateOrThrow method is invoked. + */ + def get: Option[FinalizedFeaturesAndEpoch] = { + featuresAndEpoch + } + + def empty: Boolean = { + featuresAndEpoch.isEmpty + } + + /** + * Clears all existing finalized features and epoch from the cache. + */ + def clear(): Unit = { + featuresAndEpoch = Option.empty + info("Cleared cache") + } + + /** + * Updates the cache to the latestFeatures, and updates the existing epoch to latestEpoch. + * Raises an exception when the operation is not successful. + * + * @param latestFeatures the latest finalized features to be set in the cache + * @param latestEpoch the latest epoch value to be set in the cache + * + * @throws FeatureCacheUpdateException if the cache update operation fails + * due to invalid parameters or incompatibilities with the broker's + * supported features. In such a case, the existing cache contents are + * not modified. + */ + def updateOrThrow(latestFeatures: Features[VersionLevelRange], latestEpoch: Int): Unit = { + updateOrThrow(FinalizedFeaturesAndEpoch(latestFeatures, latestEpoch)) + } + + private def updateOrThrow(latest: FinalizedFeaturesAndEpoch): Unit = { + val existingStr = featuresAndEpoch.map(existing => existing.toString).getOrElse("<empty>") + if (!featuresAndEpoch.isEmpty && featuresAndEpoch.get.epoch > latest.epoch) { + val errorMsg = ("FinalizedFeatureCache update failed due to invalid epoch in new finalized %s." + + " The existing finalized is %s").format(latest, existingStr) + throw new FeatureCacheUpdateException(errorMsg) + } else { + val incompatibleFeatures = SupportedFeatures.incompatibleFeatures(latest.features) + if (incompatibleFeatures.nonEmpty) { + val errorMsg = ("FinalizedFeatureCache updated failed since feature compatibility" + Review comment: nit: this errorMsg val seems redundant. ########## File path: core/src/main/scala/kafka/server/FinalizedFeatureChangeListener.scala ########## @@ -0,0 +1,200 @@ +package kafka.server + +import java.util.concurrent.{CountDownLatch, LinkedBlockingQueue, TimeUnit} + +import kafka.utils.{Logging, ShutdownableThread} +import kafka.zk.{FeatureZNode,FeatureZNodeStatus, KafkaZkClient, ZkVersion} +import kafka.zookeeper.ZNodeChangeHandler +import org.apache.kafka.common.internals.FatalExitError + +import scala.concurrent.TimeoutException + +/** + * Listens to changes in the ZK feature node, via the ZK client. Whenever a change notification + * is received from ZK, the feature cache in FinalizedFeatureCache is asynchronously updated + * to the latest features read from ZK. The cache updates are serialized through a single + * notification processor thread. + * + * @param zkClient the Zookeeper client + */ +class FinalizedFeatureChangeListener(zkClient: KafkaZkClient) extends Logging { + + /** + * Helper class used to update the FinalizedFeatureCache. + * + * @param featureZkNodePath the path to the ZK feature node to be read + * @param maybeNotifyOnce an optional latch that can be used to propvide notification + * when an update operation is over + */ + private class FeatureCacheUpdater(featureZkNodePath: String, maybeNotifyOnce: Option[CountDownLatch]) { + + def this(featureZkNodePath: String) = this(featureZkNodePath, Option.empty) + + /** + * Updates the feature cache in FinalizedFeatureCache with the latest features read from the + * ZK node in featureZkNodePath. If the cache update is not successful, then, a suitable + * exception is raised. + * + * NOTE: if a notifier was provided in the constructor, then, this method can be invoked + * only exactly once successfully. + */ + def updateLatestOrThrow(): Unit = { + maybeNotifyOnce.foreach(notifier => { + if (notifier.getCount != 1) { + throw new IllegalStateException( + "Can not notify after updateLatestOrThrow was called more than once successfully.") + } + }) + + info(s"Reading feature ZK node at path: $featureZkNodePath") + val (mayBeFeatureZNodeBytes, version) = zkClient.getDataAndVersion(featureZkNodePath) + if (version == ZkVersion.UnknownVersion) { + info(s"Feature ZK node at path: $featureZkNodePath does not exist") + FinalizedFeatureCache.clear() + } else { + val featureZNode = FeatureZNode.decode(mayBeFeatureZNodeBytes.get) + if (featureZNode.status == FeatureZNodeStatus.Disabled) { + info(s"Feature ZK node at path: $featureZkNodePath is in disabled status") + FinalizedFeatureCache.clear() + } else if(featureZNode.status == FeatureZNodeStatus.Enabled) { + FinalizedFeatureCache.updateOrThrow(featureZNode.features, version) + } else { + throw new IllegalStateException(s"Unexpected FeatureZNodeStatus found in $featureZNode") + } + } + + maybeNotifyOnce.foreach(notifier => notifier.countDown()) + } + + /** + * Waits until at least a single updateLatestOrThrow completes successfully. + * NOTE: The method returns immediately if an updateLatestOrThrow call has already completed + * successfully. + * + * @param waitTimeMs the timeout for the wait operation + * + * @throws - RuntimeException if the thread was interrupted during wait + * - TimeoutException if the wait can not be completed in waitTimeMs + * milli seconds + */ + def awaitUpdateOrThrow(waitTimeMs: Long): Unit = { + maybeNotifyOnce.foreach(notifier => { + var success = false + try { + success = notifier.await(waitTimeMs, TimeUnit.MILLISECONDS) + } catch { + case e: InterruptedException => + throw new RuntimeException( + "Unable to wait for FinalizedFeatureCache update to finish.", e) + } + + if (!success) { + throw new TimeoutException( + s"Timed out after waiting for ${waitTimeMs}ms for FeatureCache to be updated.") + } + }) + } + } + + /** + * A shutdownable thread to process feature node change notifications that are populated into the + * queue. If any change notification can not be processed successfully (unless it is due to an + * interrupt), the thread treats it as a fatal event and triggers Broker exit. + * + * @param name name of the thread + */ + private class ChangeNotificationProcessorThread(name: String) extends ShutdownableThread(name = name) { + override def doWork(): Unit = { + try { + queue.take.updateLatestOrThrow() + } catch { + case e: InterruptedException => info(s"Interrupted", e) + case e: Exception => { + error("Failed to process feature ZK node change event. The broker will exit.", e) + throw new FatalExitError(1) + } + } + } + } + + // Feature ZK node change handler. + object FeatureZNodeChangeHandler extends ZNodeChangeHandler { + override val path: String = FeatureZNode.path + + private def processNotification(): Unit = { + queue.add(new FeatureCacheUpdater(path)) + } + + override def handleCreation(): Unit = { + info(s"Feature ZK node created at path: $path") + processNotification() + } + + override def handleDataChange(): Unit = { + info(s"Feature ZK node updated at path: $path") + processNotification() + } + + override def handleDeletion(): Unit = { + warn(s"Feature ZK node deleted at path: $path") + processNotification() Review comment: Does this event actually happen? Will we hit illegal state exception in `updateLatestOrThrow`? ########## File path: clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java ########## @@ -113,14 +141,26 @@ public static ApiVersionsResponse fromStruct(Struct struct, short version) { } } - public static ApiVersionsResponse apiVersionsResponse(int throttleTimeMs, byte maxMagic) { + public static ApiVersionsResponse apiVersionsResponse( Review comment: Note this function is public, which suggests there could be external dependency that we need to take care of. The safer approach is to keep this static function and create a separate one with augmented parameters. cc @ijuma for validation. ########## File path: clients/src/main/java/org/apache/kafka/common/feature/Features.java ########## @@ -0,0 +1,143 @@ +package org.apache.kafka.common.feature; + +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.Objects; + +import static java.util.stream.Collectors.joining; + +/** + * Represents an immutable dictionary with key being feature name, and value being VersionRangeType. + * Also provides API to serialize/deserialize the features and their version ranges to/from a map. + * + * This class can be instantiated only using its factory functions, with the important ones being: + * Features.supportedFeatures(...) and Features.finalizedFeatures(...). + * + * @param <VersionRangeType> is the type of version range. + */ +public class Features<VersionRangeType extends VersionRange> { + private final Map<String, VersionRangeType> features; + + /** + * Constructor is made private, as for readability it is preferred the caller uses one of the + * static factory functions for instantiation (see below). + * + * @param features Map of feature name to type of VersionRange, as the backing data structure + * for the Features object. + */ + private Features(Map<String, VersionRangeType> features) { + this.features = features; + } + + /** + * @param features Map of feature name to VersionRange, as the backing data structure + * for the Features object. + * @return Returns a new Features object representing "supported" features. + */ + public static Features<VersionRange> supportedFeatures(Map<String, VersionRange> features) { + return new Features<VersionRange>(features); + } + + /** + * @param features Map of feature name to VersionLevelRange, as the backing data structure + * for the Features object. + * @return Returns a new Features object representing "finalized" features. + */ + public static Features<VersionLevelRange> finalizedFeatures(Map<String, VersionLevelRange> features) { + return new Features<VersionLevelRange>(features); + } + + public static Features<VersionLevelRange> emptyFinalizedFeatures() { + return new Features<>(new HashMap<>()); + } + + public static Features<VersionRange> emptySupportedFeatures() { + return new Features<>(new HashMap<>()); + } + + + public Map<String, VersionRangeType> all() { Review comment: I gave it more thought, and wonder whether we could just call this function `features` to be more consistent with our convention for getters. ########## File path: clients/src/test/java/org/apache/kafka/common/feature/VersionLevelRangeTest.java ########## @@ -0,0 +1,162 @@ +package org.apache.kafka.common.feature; + +import java.util.HashMap; +import java.util.Map; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class VersionLevelRangeTest { + + @Test + public void testCreateFailDueToInvalidParams() { + // min and max can't be < 1. Review comment: Does L17-23 really necessary for testing? ########## File path: clients/src/main/java/org/apache/kafka/common/feature/VersionRange.java ########## @@ -0,0 +1,104 @@ +package org.apache.kafka.common.feature; + +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +/** + * Represents an immutable basic version range using 2 attributes: min and max of type long. + * The min and max attributes are expected to be >= 1, and with max >= min. + * + * The class also provides API to serialize/deserialize the version range to/from a map. + * The class allows for configurable labels for the min/max attributes, which can be specialized by + * sub-classes (if needed). + * + * NOTE: This is the backing class used to define the min/max versions for supported features. + */ +public class VersionRange { + // Label for the min version key, that's used only for serialization/deserialization purposes. + private static final String MIN_VERSION_KEY_LABEL = "min_version"; + + // Label for the max version key, that's used only for serialization/deserialization purposes. + private static final String MAX_VERSION_KEY_LABEL = "max_version"; + + private final String minKeyLabel; + + private final long minValue; + + private final String maxKeyLabel; + + private final long maxValue; + + protected VersionRange(String minKey, long minValue, String maxKeyLabel, long maxValue) { + if (minValue < 1 || maxValue < 1 || maxValue < minValue) { + throw new IllegalArgumentException( + String.format( + "Expected minValue > 1, maxValue > 1 and maxValue >= minValue, but received" + + " minValue: %d, maxValue: %d", minValue, maxValue)); + } + this.minKeyLabel = minKey; + this.minValue = minValue; + this.maxKeyLabel = maxKeyLabel; + this.maxValue = maxValue; + } + + public VersionRange(long minVersion, long maxVersion) { + this(MIN_VERSION_KEY_LABEL, minVersion, MAX_VERSION_KEY_LABEL, maxVersion); + } + + public long min() { + return minValue; + } + + public long max() { + return maxValue; + } + + public String toString() { + return String.format("%s[%d, %d]", this.getClass().getSimpleName(), min(), max()); + } + + public Map<String, Long> serialize() { + return new HashMap<String, Long>() { + { + put(minKeyLabel, min()); + put(maxKeyLabel, max()); + } + }; + } + + public static VersionRange deserialize(Map<String, Long> serialized) { + return new VersionRange( + valueOrThrow(MIN_VERSION_KEY_LABEL, serialized), + valueOrThrow(MAX_VERSION_KEY_LABEL, serialized)); + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + if (!(other instanceof VersionRange)) { + return false; + } + + final VersionRange that = (VersionRange) other; + return Objects.equals(this.minKeyLabel, that.minKeyLabel) && Review comment: Is there a difference between `Objects.equals` and `this.minKeyLabel.equals(that.minKeyLabel)`? ########## File path: clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java ########## @@ -77,6 +93,18 @@ public boolean shouldClientThrottle(short version) { return version >= 2; } + public SupportedFeatureKey supportedFeature(String featureName) { Review comment: I think we could delay the addition for these helpers until we actually need them. ########## File path: core/src/main/scala/kafka/server/FinalizedFeatureCache.scala ########## @@ -0,0 +1,93 @@ +package kafka.server + +import kafka.utils.Logging +import org.apache.kafka.common.feature.{Features, VersionLevelRange} + +// Raised whenever there was an error in updating the FinalizedFeatureCache with features. +class FeatureCacheUpdateException(message: String) extends RuntimeException(message) { +} + +// Helper class that represents finalized features along with an epoch value. +case class FinalizedFeaturesAndEpoch(features: Features[VersionLevelRange], epoch: Int) { + + def isValid(newEpoch: Int): Boolean = { Review comment: Is this function being used? ########## File path: core/src/main/scala/kafka/zk/KafkaZkClient.scala ########## @@ -1567,6 +1567,36 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo createRecursive(path, data = null, throwIfPathExists = false) } + // Visible for testing. + def createFeatureZNode(nodeContents: FeatureZNode): Unit = { Review comment: Do you expect these helper functions actually to be used in production logic with subsequent PRs? ########## File path: core/src/main/scala/kafka/server/FinalizedFeatureCache.scala ########## @@ -0,0 +1,93 @@ +package kafka.server + +import kafka.utils.Logging +import org.apache.kafka.common.feature.{Features, VersionLevelRange} + +// Raised whenever there was an error in updating the FinalizedFeatureCache with features. Review comment: It seems that we don't have the handling logic for this FeatureCacheUpdateException. Do we think this is fatal? ########## File path: core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala ########## @@ -149,6 +153,53 @@ class BrokerEndPointTest { assertEquals(None, broker.rack) } + @Test + def testFromJsonV5(): Unit = { Review comment: What would happen if we are dealing with a V4 json map containing features? ########## File path: core/src/main/scala/kafka/zk/ZkData.scala ########## @@ -225,7 +255,12 @@ object BrokerIdZNode { } val rack = brokerInfo.get(RackKey).flatMap(_.to[Option[String]]) - BrokerInfo(Broker(id, endpoints, rack), version, jmxPort) + val features = FeatureZNode.asJavaMap(brokerInfo Review comment: Could we make feature extraction as a helper function? ########## File path: core/src/main/scala/kafka/server/FinalizedFeatureCache.scala ########## @@ -0,0 +1,93 @@ +package kafka.server + +import kafka.utils.Logging +import org.apache.kafka.common.feature.{Features, VersionLevelRange} + +// Raised whenever there was an error in updating the FinalizedFeatureCache with features. Review comment: Might worth getting a ticket to define the handling strategy for such exception, and in general how `updateOrThrow` will be used. ########## File path: core/src/main/scala/kafka/server/FinalizedFeatureCache.scala ########## @@ -0,0 +1,93 @@ +package kafka.server + +import kafka.utils.Logging +import org.apache.kafka.common.feature.{Features, VersionLevelRange} + +// Raised whenever there was an error in updating the FinalizedFeatureCache with features. +class FeatureCacheUpdateException(message: String) extends RuntimeException(message) { +} + +// Helper class that represents finalized features along with an epoch value. +case class FinalizedFeaturesAndEpoch(features: Features[VersionLevelRange], epoch: Int) { + + def isValid(newEpoch: Int): Boolean = { + newEpoch >= epoch + } + + override def toString(): String = { + "FinalizedFeaturesAndEpoch(features=%s, epoch=%s)".format(features, epoch) + } +} + +/** + * A mutable cache containing the latest finalized features and epoch. This cache is populated by a + * FinalizedFeatureChangeListener. + * + * Currently the main reader of this cache is the read path that serves an ApiVersionsRequest + * returning the features information in the response. In the future, as the feature versioning + * system in KIP-584 is used more widely, this cache could be read by other read paths trying to + * learn the finalized feature information. + */ +object FinalizedFeatureCache extends Logging { + @volatile private var featuresAndEpoch: Option[FinalizedFeaturesAndEpoch] = Option.empty + + /** + * @return the latest known FinalizedFeaturesAndEpoch. If the returned value is empty, it means + * no FinalizedFeaturesAndEpoch exists in the cache at the time when this + * method is invoked. This result could change in the future whenever the + * updateOrThrow method is invoked. + */ + def get: Option[FinalizedFeaturesAndEpoch] = { + featuresAndEpoch + } + + def empty: Boolean = { + featuresAndEpoch.isEmpty + } + + /** + * Clears all existing finalized features and epoch from the cache. + */ + def clear(): Unit = { + featuresAndEpoch = Option.empty + info("Cleared cache") + } + + /** + * Updates the cache to the latestFeatures, and updates the existing epoch to latestEpoch. + * Raises an exception when the operation is not successful. + * + * @param latestFeatures the latest finalized features to be set in the cache + * @param latestEpoch the latest epoch value to be set in the cache + * + * @throws FeatureCacheUpdateException if the cache update operation fails + * due to invalid parameters or incompatibilities with the broker's + * supported features. In such a case, the existing cache contents are + * not modified. + */ + def updateOrThrow(latestFeatures: Features[VersionLevelRange], latestEpoch: Int): Unit = { + updateOrThrow(FinalizedFeaturesAndEpoch(latestFeatures, latestEpoch)) + } + + private def updateOrThrow(latest: FinalizedFeaturesAndEpoch): Unit = { + val existingStr = featuresAndEpoch.map(existing => existing.toString).getOrElse("<empty>") Review comment: s/existingStr/oldFeatureAndEpoch ########## File path: core/src/main/scala/kafka/server/SupportedFeatures.scala ########## @@ -0,0 +1,70 @@ +package kafka.server + +import kafka.utils.Logging +import org.apache.kafka.common.feature.{Features, VersionRange, VersionLevelRange} +import org.apache.kafka.common.feature.Features._ + +import scala.jdk.CollectionConverters._ + +/** + * A common object used in the Broker to define the latest features supported by the Broker. + * Also provides API to check for incompatibilities between the latest features supported by the + * Broker and cluster-wide finalized features. + */ +object SupportedFeatures extends Logging { + /** + * This is the latest features supported by the Broker. + * This is currently empty, but in the future as we define supported features, this map should be + * populated. + */ + @volatile private var supportedFeatures = emptySupportedFeatures + + /** + * Returns the latest features supported by the Broker. + */ + def get: Features[VersionRange] = { + supportedFeatures + } + + // Should be used only for testing. + def update(newFeatures: Features[VersionRange]): Unit = { + supportedFeatures = newFeatures + } + + // Should be used only for testing. + def clear(): Unit = { + supportedFeatures = emptySupportedFeatures + } + + /** + * Returns the set of feature names found to be incompatible between the latest features supported + * by the Broker, and the provided cluster-wide finalized features. + * + * @param finalized The finalized features against which incompatibilities need to be checked for. + * + * @return The set of incompatible feature names. If the returned set is empty, it + * means there were no feature incompatibilities found. + */ + def incompatibleFeatures(finalized: Features[VersionLevelRange]): Set[String] = { + val supported = get Review comment: This is only used on L53, maybe we could just use supportedFeatures instead ########## File path: core/src/test/scala/unit/kafka/server/FinalizedFeatureCacheTest.scala ########## @@ -0,0 +1,95 @@ +package kafka.server + +import org.apache.kafka.common.feature.{Features, VersionLevelRange, VersionRange} +import org.junit.Assert.{assertEquals, assertThrows, assertTrue} +import org.junit.{Before, Test} + +import scala.jdk.CollectionConverters._ + +class FinalizedFeatureCacheTest { + + @Before + def setUp(): Unit = { + FinalizedFeatureCache.clear() + SupportedFeatures.clear() + } + + @Test + def testEmpty(): Unit = { + assertTrue(FinalizedFeatureCache.get.isEmpty) + } + + @Test + def testUpdateOrThrowFailedDueToInvalidEpoch(): Unit = { + val supportedFeatures = Map[String, VersionRange]( + "feature_1" -> new VersionRange(1, 4)) + SupportedFeatures.update(Features.supportedFeatures(supportedFeatures.asJava)) + + val features = Map[String, VersionLevelRange]( + "feature_1" -> new VersionLevelRange(1, 4)) + val finalizedFeatures = Features.finalizedFeatures(features.asJava) + + FinalizedFeatureCache.updateOrThrow(finalizedFeatures, 10) + assertEquals(finalizedFeatures, FinalizedFeatureCache.get.get.features) Review comment: Should we test `isDefined` before calling `get`? ########## File path: core/src/main/scala/kafka/server/FinalizedFeatureCache.scala ########## @@ -0,0 +1,93 @@ +package kafka.server + +import kafka.utils.Logging +import org.apache.kafka.common.feature.{Features, VersionLevelRange} + +// Raised whenever there was an error in updating the FinalizedFeatureCache with features. +class FeatureCacheUpdateException(message: String) extends RuntimeException(message) { +} + +// Helper class that represents finalized features along with an epoch value. +case class FinalizedFeaturesAndEpoch(features: Features[VersionLevelRange], epoch: Int) { + + def isValid(newEpoch: Int): Boolean = { + newEpoch >= epoch + } + + override def toString(): String = { + "FinalizedFeaturesAndEpoch(features=%s, epoch=%s)".format(features, epoch) + } +} + +/** + * A mutable cache containing the latest finalized features and epoch. This cache is populated by a + * FinalizedFeatureChangeListener. + * + * Currently the main reader of this cache is the read path that serves an ApiVersionsRequest + * returning the features information in the response. In the future, as the feature versioning + * system in KIP-584 is used more widely, this cache could be read by other read paths trying to + * learn the finalized feature information. + */ +object FinalizedFeatureCache extends Logging { + @volatile private var featuresAndEpoch: Option[FinalizedFeaturesAndEpoch] = Option.empty + + /** + * @return the latest known FinalizedFeaturesAndEpoch. If the returned value is empty, it means + * no FinalizedFeaturesAndEpoch exists in the cache at the time when this + * method is invoked. This result could change in the future whenever the + * updateOrThrow method is invoked. + */ + def get: Option[FinalizedFeaturesAndEpoch] = { + featuresAndEpoch + } + + def empty: Boolean = { + featuresAndEpoch.isEmpty + } + + /** + * Clears all existing finalized features and epoch from the cache. + */ + def clear(): Unit = { + featuresAndEpoch = Option.empty + info("Cleared cache") + } + + /** + * Updates the cache to the latestFeatures, and updates the existing epoch to latestEpoch. + * Raises an exception when the operation is not successful. + * + * @param latestFeatures the latest finalized features to be set in the cache + * @param latestEpoch the latest epoch value to be set in the cache + * + * @throws FeatureCacheUpdateException if the cache update operation fails + * due to invalid parameters or incompatibilities with the broker's + * supported features. In such a case, the existing cache contents are + * not modified. + */ + def updateOrThrow(latestFeatures: Features[VersionLevelRange], latestEpoch: Int): Unit = { + updateOrThrow(FinalizedFeaturesAndEpoch(latestFeatures, latestEpoch)) + } + + private def updateOrThrow(latest: FinalizedFeaturesAndEpoch): Unit = { + val existingStr = featuresAndEpoch.map(existing => existing.toString).getOrElse("<empty>") + if (!featuresAndEpoch.isEmpty && featuresAndEpoch.get.epoch > latest.epoch) { + val errorMsg = ("FinalizedFeatureCache update failed due to invalid epoch in new finalized %s." + + " The existing finalized is %s").format(latest, existingStr) + throw new FeatureCacheUpdateException(errorMsg) + } else { + val incompatibleFeatures = SupportedFeatures.incompatibleFeatures(latest.features) + if (incompatibleFeatures.nonEmpty) { + val errorMsg = ("FinalizedFeatureCache updated failed since feature compatibility" + + " checks failed! Supported %s has incompatibilities with the latest finalized %s." + + " The incompatible features are: %s.").format( + SupportedFeatures.get, latest, incompatibleFeatures) + throw new FeatureCacheUpdateException(errorMsg) + } + } + val logMsg = "Updated cache from existing finalized %s to latest finalized %s".format( Review comment: This val seems redundant. ########## File path: core/src/main/scala/kafka/server/SupportedFeatures.scala ########## @@ -0,0 +1,70 @@ +package kafka.server + +import kafka.utils.Logging +import org.apache.kafka.common.feature.{Features, VersionRange, VersionLevelRange} +import org.apache.kafka.common.feature.Features._ + +import scala.jdk.CollectionConverters._ + +/** + * A common object used in the Broker to define the latest features supported by the Broker. + * Also provides API to check for incompatibilities between the latest features supported by the + * Broker and cluster-wide finalized features. + */ +object SupportedFeatures extends Logging { + /** + * This is the latest features supported by the Broker. + * This is currently empty, but in the future as we define supported features, this map should be + * populated. + */ + @volatile private var supportedFeatures = emptySupportedFeatures + + /** + * Returns the latest features supported by the Broker. + */ + def get: Features[VersionRange] = { + supportedFeatures + } + + // Should be used only for testing. + def update(newFeatures: Features[VersionRange]): Unit = { + supportedFeatures = newFeatures + } + + // Should be used only for testing. + def clear(): Unit = { + supportedFeatures = emptySupportedFeatures + } + + /** + * Returns the set of feature names found to be incompatible between the latest features supported + * by the Broker, and the provided cluster-wide finalized features. + * + * @param finalized The finalized features against which incompatibilities need to be checked for. + * + * @return The set of incompatible feature names. If the returned set is empty, it + * means there were no feature incompatibilities found. + */ + def incompatibleFeatures(finalized: Features[VersionLevelRange]): Set[String] = { + val supported = get + + val incompatibilities = finalized.all.asScala.collect { + case (feature, versionLevels) => { + val supportedVersions = supported.get(feature); + if (supportedVersions == null) { + (feature, "{feature=%s, reason='Unsupported feature'}".format(feature)) + } else if (!versionLevels.isCompatibleWith(supportedVersions)) { Review comment: nit: maybe rename to `incompatibleWith` and flip the boolean ########## File path: core/src/main/scala/kafka/server/KafkaServer.scala ########## @@ -210,6 +215,14 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP /* setup zookeeper */ initZkClient(time) + /* initialize features */ + _featureChangeListener = new FinalizedFeatureChangeListener(_zkClient) + if (config.interBrokerProtocolVersion >= KAFKA_2_6_IV1) { + // The feature versioning system (KIP-584) is active only when: + // config.interBrokerProtocolVersion is >= KAFKA_2_6_IV1. + _featureChangeListener.initOrThrow(60000) Review comment: Could we make this parameter configurable? ########## File path: core/src/main/scala/kafka/server/SupportedFeatures.scala ########## @@ -0,0 +1,70 @@ +package kafka.server + +import kafka.utils.Logging +import org.apache.kafka.common.feature.{Features, VersionRange, VersionLevelRange} +import org.apache.kafka.common.feature.Features._ + +import scala.jdk.CollectionConverters._ + +/** + * A common object used in the Broker to define the latest features supported by the Broker. + * Also provides API to check for incompatibilities between the latest features supported by the + * Broker and cluster-wide finalized features. + */ +object SupportedFeatures extends Logging { + /** + * This is the latest features supported by the Broker. + * This is currently empty, but in the future as we define supported features, this map should be + * populated. + */ + @volatile private var supportedFeatures = emptySupportedFeatures + + /** + * Returns the latest features supported by the Broker. + */ + def get: Features[VersionRange] = { + supportedFeatures + } + + // Should be used only for testing. + def update(newFeatures: Features[VersionRange]): Unit = { + supportedFeatures = newFeatures + } + + // Should be used only for testing. + def clear(): Unit = { + supportedFeatures = emptySupportedFeatures + } + + /** + * Returns the set of feature names found to be incompatible between the latest features supported + * by the Broker, and the provided cluster-wide finalized features. + * + * @param finalized The finalized features against which incompatibilities need to be checked for. Review comment: This comment is a bit vague to me, what are you referring by `incompatibilities`? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org