kowshik commented on a change in pull request #8680: URL: https://github.com/apache/kafka/pull/8680#discussion_r428573407
########## File path: clients/src/main/java/org/apache/kafka/common/feature/BaseVersionRange.java ########## @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.feature; + +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +/** + * Represents an immutable basic version range using 2 attributes: min and max of type long. + * The min and max attributes are expected to be >= 1, and with max >= min. + * + * The class also provides API to serialize/deserialize the version range to/from a map. + * The class allows for configurable labels for the min/max attributes, which can be specialized by + * sub-classes (if needed). + */ +class BaseVersionRange { Review comment: It is thoroughly tested in it's child class test suite: `SupportedVersionRangeTest`. Personally I feel it is good enough this way, because, anyway to test this class we need to inherit into a sub-class (since constructor is `protected`). And by testing via `SupportedVersionRangeTest`, we achieve exactly the same. I have now added top-level documentation in the test suite of `SupportedVersionRangeTest`, explaining the above. ########## File path: clients/src/main/java/org/apache/kafka/common/feature/Features.java ########## @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.feature; + +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.Objects; + +import static java.util.stream.Collectors.joining; + +/** + * Represents an immutable dictionary with key being feature name, and value being VersionRangeType. + * Also provides API to serialize/deserialize the features and their version ranges to/from a map. + * + * This class can be instantiated only using its factory functions, with the important ones being: + * Features.supportedFeatures(...) and Features.finalizedFeatures(...). + * + * @param <VersionRangeType> is the type of version range. + * @see SupportedVersionRange + * @see FinalizedVersionRange + */ +public class Features<VersionRangeType extends BaseVersionRange> { + private final Map<String, VersionRangeType> features; + + /** + * Constructor is made private, as for readability it is preferred the caller uses one of the + * static factory functions for instantiation (see below). + * + * @param features Map of feature name to type of VersionRange, as the backing data structure + * for the Features object. + */ + private Features(Map<String, VersionRangeType> features) { + this.features = features; + } + + /** + * @param features Map of feature name to VersionRange, as the backing data structure + * for the Features object. + * @return Returns a new Features object representing "supported" features. + */ + public static Features<SupportedVersionRange> supportedFeatures(Map<String, SupportedVersionRange> features) { + return new Features<>(features); + } + + /** + * @param features Map of feature name to FinalizedVersionRange, as the backing data structure + * for the Features object. + * @return Returns a new Features object representing "finalized" features. + */ + public static Features<FinalizedVersionRange> finalizedFeatures(Map<String, FinalizedVersionRange> features) { + return new Features<>(features); + } + + // Visible for testing. + public static Features<FinalizedVersionRange> emptyFinalizedFeatures() { + return new Features<>(new HashMap<>()); + } + + public static Features<SupportedVersionRange> emptySupportedFeatures() { + return new Features<>(new HashMap<>()); + } + + public Map<String, VersionRangeType> features() { + return features; + } + + public boolean empty() { + return features.isEmpty(); + } + + /** + * @param feature name of the feature + * + * @return the VersionRangeType corresponding to the feature name, or null if absent + */ + public VersionRangeType get(String feature) { + return features.get(feature); + } + + public String toString() { + return String.format( + "Features{%s}", + features + .entrySet() + .stream() + .map(entry -> String.format("(%s -> %s)", entry.getKey(), entry.getValue())) + .collect(joining(", ")) + ); + } + + /** + * @return A map with underlying features serialized. The returned value can be deserialized + * using one of the deserialize* APIs. Review comment: Done. ########## File path: clients/src/main/java/org/apache/kafka/common/feature/FinalizedVersionRange.java ########## @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.feature; + +import java.util.Map; + +/** + * A specialization of {@link BaseVersionRange} representing a range of version levels. + * NOTE: This is the backing class used to define the min/max version levels for finalized features. + */ +public class FinalizedVersionRange extends BaseVersionRange { + // Label for the min version key, that's used only for serialization/deserialization purposes. + private static final String MIN_VERSION_LEVEL_KEY_LABEL = "min_version_level"; + + // Label for the max version key, that's used only for serialization/deserialization purposes. + private static final String MAX_VERSION_LEVEL_KEY_LABEL = "max_version_level"; + + public FinalizedVersionRange(long minVersionLevel, long maxVersionLevel) { + super(MIN_VERSION_LEVEL_KEY_LABEL, minVersionLevel, MAX_VERSION_LEVEL_KEY_LABEL, maxVersionLevel); + } + + public static FinalizedVersionRange deserialize(Map<String, Long> serialized) { + return new FinalizedVersionRange( + BaseVersionRange.valueOrThrow(MIN_VERSION_LEVEL_KEY_LABEL, serialized), + BaseVersionRange.valueOrThrow(MAX_VERSION_LEVEL_KEY_LABEL, serialized)); + } + + private boolean isCompatibleWith(BaseVersionRange versionRange) { + return min() >= versionRange.min() && max() <= versionRange.max(); + } + + /** + * Checks if the [min, max] version level range of this object does *NOT* fall within the + * [min, max] version range of the provided SupportedVersionRange parameter. + * + * @param versionRange the SupportedVersionRange to be checked Review comment: Done. ########## File path: clients/src/main/java/org/apache/kafka/common/feature/Features.java ########## @@ -0,0 +1,143 @@ +package org.apache.kafka.common.feature; + +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.Objects; + +import static java.util.stream.Collectors.joining; + +/** + * Represents an immutable dictionary with key being feature name, and value being VersionRangeType. + * Also provides API to serialize/deserialize the features and their version ranges to/from a map. + * + * This class can be instantiated only using its factory functions, with the important ones being: + * Features.supportedFeatures(...) and Features.finalizedFeatures(...). + * + * @param <VersionRangeType> is the type of version range. + */ +public class Features<VersionRangeType extends VersionRange> { + private final Map<String, VersionRangeType> features; + + /** + * Constructor is made private, as for readability it is preferred the caller uses one of the + * static factory functions for instantiation (see below). + * + * @param features Map of feature name to type of VersionRange, as the backing data structure + * for the Features object. + */ + private Features(Map<String, VersionRangeType> features) { + this.features = features; Review comment: Done. I'm raising an exception now if it is `null`. I see your point. Will be good to learn what is the convention in Kafka for constructor param null checks. ########## File path: clients/src/main/java/org/apache/kafka/common/feature/FinalizedVersionRange.java ########## @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.feature; + +import java.util.Map; + +/** + * A specialization of {@link BaseVersionRange} representing a range of version levels. + * NOTE: This is the backing class used to define the min/max version levels for finalized features. + */ +public class FinalizedVersionRange extends BaseVersionRange { + // Label for the min version key, that's used only for serialization/deserialization purposes. + private static final String MIN_VERSION_LEVEL_KEY_LABEL = "min_version_level"; + + // Label for the max version key, that's used only for serialization/deserialization purposes. + private static final String MAX_VERSION_LEVEL_KEY_LABEL = "max_version_level"; + + public FinalizedVersionRange(long minVersionLevel, long maxVersionLevel) { + super(MIN_VERSION_LEVEL_KEY_LABEL, minVersionLevel, MAX_VERSION_LEVEL_KEY_LABEL, maxVersionLevel); + } + + public static FinalizedVersionRange deserialize(Map<String, Long> serialized) { + return new FinalizedVersionRange( + BaseVersionRange.valueOrThrow(MIN_VERSION_LEVEL_KEY_LABEL, serialized), + BaseVersionRange.valueOrThrow(MAX_VERSION_LEVEL_KEY_LABEL, serialized)); + } + + private boolean isCompatibleWith(BaseVersionRange versionRange) { Review comment: Done. ########## File path: clients/src/main/java/org/apache/kafka/common/feature/SupportedVersionRange.java ########## @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.feature; + +import java.util.Map; + +/** + * A specialization of VersionRange representing a range of versions. + * NOTE: This is the backing class used to define the min/max versions for supported features. Review comment: Done. Good point! ########## File path: core/src/main/scala/kafka/server/FinalizedFeatureChangeListener.scala ########## @@ -0,0 +1,203 @@ +package kafka.server + +import java.util.concurrent.{CountDownLatch, LinkedBlockingQueue, TimeUnit} + +import kafka.utils.{Logging, ShutdownableThread} +import kafka.zk.{FeatureZNode,FeatureZNodeStatus, KafkaZkClient, ZkVersion} +import kafka.zookeeper.ZNodeChangeHandler +import org.apache.kafka.common.internals.FatalExitError + +import scala.concurrent.TimeoutException + +/** + * Listens to changes in the ZK feature node, via the ZK client. Whenever a change notification + * is received from ZK, the feature cache in FinalizedFeatureCache is asynchronously updated + * to the latest features read from ZK. The cache updates are serialized through a single + * notification processor thread. + * + * @param zkClient the Zookeeper client + */ +class FinalizedFeatureChangeListener(zkClient: KafkaZkClient) extends Logging { + + /** + * Helper class used to update the FinalizedFeatureCache. + * + * @param featureZkNodePath the path to the ZK feature node to be read + * @param maybeNotifyOnce an optional latch that can be used to propvide notification Review comment: Done. ########## File path: core/src/main/scala/kafka/server/FinalizedFeatureChangeListener.scala ########## @@ -0,0 +1,203 @@ +package kafka.server + +import java.util.concurrent.{CountDownLatch, LinkedBlockingQueue, TimeUnit} + +import kafka.utils.{Logging, ShutdownableThread} +import kafka.zk.{FeatureZNode,FeatureZNodeStatus, KafkaZkClient, ZkVersion} +import kafka.zookeeper.ZNodeChangeHandler +import org.apache.kafka.common.internals.FatalExitError + +import scala.concurrent.TimeoutException + +/** + * Listens to changes in the ZK feature node, via the ZK client. Whenever a change notification + * is received from ZK, the feature cache in FinalizedFeatureCache is asynchronously updated + * to the latest features read from ZK. The cache updates are serialized through a single + * notification processor thread. + * + * @param zkClient the Zookeeper client + */ +class FinalizedFeatureChangeListener(zkClient: KafkaZkClient) extends Logging { + + /** + * Helper class used to update the FinalizedFeatureCache. + * + * @param featureZkNodePath the path to the ZK feature node to be read + * @param maybeNotifyOnce an optional latch that can be used to propvide notification + * when an update operation is over + */ + private class FeatureCacheUpdater(featureZkNodePath: String, maybeNotifyOnce: Option[CountDownLatch]) { + + def this(featureZkNodePath: String) = this(featureZkNodePath, Option.empty) + + /** + * Updates the feature cache in FinalizedFeatureCache with the latest features read from the + * ZK node in featureZkNodePath. If the cache update is not successful, then, a suitable + * exception is raised. + * + * NOTE: if a notifier was provided in the constructor, then, this method can be invoked + * only exactly once successfully. + */ + def updateLatestOrThrow(): Unit = { + maybeNotifyOnce.foreach(notifier => { + if (notifier.getCount != 1) { + throw new IllegalStateException( + "Can not notify after updateLatestOrThrow was called more than once successfully.") + } + }) + + info(s"Reading feature ZK node at path: $featureZkNodePath") + val (mayBeFeatureZNodeBytes, version) = zkClient.getDataAndVersion(featureZkNodePath) + if (version == ZkVersion.UnknownVersion) { + info(s"Feature ZK node at path: $featureZkNodePath does not exist") + FinalizedFeatureCache.clear() + } else { + val featureZNode = FeatureZNode.decode(mayBeFeatureZNodeBytes.get) + if (featureZNode.status == FeatureZNodeStatus.Disabled) { + info(s"Feature ZK node at path: $featureZkNodePath is in disabled status") + FinalizedFeatureCache.clear() + } else if(featureZNode.status == FeatureZNodeStatus.Enabled) { + FinalizedFeatureCache.updateOrThrow(featureZNode.features, version) + } else { + throw new IllegalStateException(s"Unexpected FeatureZNodeStatus found in $featureZNode") + } + } + + maybeNotifyOnce.foreach(notifier => notifier.countDown()) + } + + /** + * Waits until at least a single updateLatestOrThrow completes successfully. + * NOTE: The method returns immediately if an updateLatestOrThrow call has already completed + * successfully. + * + * @param waitTimeMs the timeout for the wait operation + * + * @throws - RuntimeException if the thread was interrupted during wait Review comment: Done. Removed. ########## File path: core/src/test/scala/unit/kafka/server/FinalizedFeatureChangeListenerTest.scala ########## @@ -0,0 +1,228 @@ +package kafka.server + +import kafka.zk.{FeatureZNode, FeatureZNodeStatus, ZkVersion, ZooKeeperTestHarness} +import kafka.utils.{Exit, TestUtils} +import org.apache.kafka.common.feature.{Features, FinalizedVersionRange, SupportedVersionRange} +import org.apache.kafka.common.internals.FatalExitError +import org.junit.Assert.{assertEquals, assertFalse, assertNotEquals, assertThrows, assertTrue} +import org.junit.{Before, Test} + +import scala.concurrent.TimeoutException +import scala.jdk.CollectionConverters._ + +class FinalizedFeatureChangeListenerTest extends ZooKeeperTestHarness { Review comment: Done. ########## File path: core/src/main/scala/kafka/server/FinalizedFeatureChangeListener.scala ########## @@ -0,0 +1,203 @@ +package kafka.server + +import java.util.concurrent.{CountDownLatch, LinkedBlockingQueue, TimeUnit} + +import kafka.utils.{Logging, ShutdownableThread} +import kafka.zk.{FeatureZNode,FeatureZNodeStatus, KafkaZkClient, ZkVersion} +import kafka.zookeeper.ZNodeChangeHandler +import org.apache.kafka.common.internals.FatalExitError + +import scala.concurrent.TimeoutException + +/** + * Listens to changes in the ZK feature node, via the ZK client. Whenever a change notification + * is received from ZK, the feature cache in FinalizedFeatureCache is asynchronously updated + * to the latest features read from ZK. The cache updates are serialized through a single + * notification processor thread. + * + * @param zkClient the Zookeeper client + */ +class FinalizedFeatureChangeListener(zkClient: KafkaZkClient) extends Logging { + + /** + * Helper class used to update the FinalizedFeatureCache. + * + * @param featureZkNodePath the path to the ZK feature node to be read + * @param maybeNotifyOnce an optional latch that can be used to propvide notification + * when an update operation is over + */ + private class FeatureCacheUpdater(featureZkNodePath: String, maybeNotifyOnce: Option[CountDownLatch]) { + + def this(featureZkNodePath: String) = this(featureZkNodePath, Option.empty) + + /** + * Updates the feature cache in FinalizedFeatureCache with the latest features read from the + * ZK node in featureZkNodePath. If the cache update is not successful, then, a suitable + * exception is raised. + * + * NOTE: if a notifier was provided in the constructor, then, this method can be invoked + * only exactly once successfully. + */ + def updateLatestOrThrow(): Unit = { + maybeNotifyOnce.foreach(notifier => { + if (notifier.getCount != 1) { + throw new IllegalStateException( + "Can not notify after updateLatestOrThrow was called more than once successfully.") + } + }) + + info(s"Reading feature ZK node at path: $featureZkNodePath") + val (mayBeFeatureZNodeBytes, version) = zkClient.getDataAndVersion(featureZkNodePath) + if (version == ZkVersion.UnknownVersion) { + info(s"Feature ZK node at path: $featureZkNodePath does not exist") + FinalizedFeatureCache.clear() + } else { + val featureZNode = FeatureZNode.decode(mayBeFeatureZNodeBytes.get) + if (featureZNode.status == FeatureZNodeStatus.Disabled) { + info(s"Feature ZK node at path: $featureZkNodePath is in disabled status") + FinalizedFeatureCache.clear() + } else if(featureZNode.status == FeatureZNodeStatus.Enabled) { + FinalizedFeatureCache.updateOrThrow(featureZNode.features, version) + } else { + throw new IllegalStateException(s"Unexpected FeatureZNodeStatus found in $featureZNode") + } + } + + maybeNotifyOnce.foreach(notifier => notifier.countDown()) + } + + /** + * Waits until at least a single updateLatestOrThrow completes successfully. + * NOTE: The method returns immediately if an updateLatestOrThrow call has already completed + * successfully. + * + * @param waitTimeMs the timeout for the wait operation + * + * @throws - RuntimeException if the thread was interrupted during wait + * - TimeoutException if the wait can not be completed in waitTimeMs + * milli seconds + */ + def awaitUpdateOrThrow(waitTimeMs: Long): Unit = { + maybeNotifyOnce.foreach(notifier => { + var success = false + try { + success = notifier.await(waitTimeMs, TimeUnit.MILLISECONDS) + } catch { + case e: InterruptedException => + throw new RuntimeException( + "Unable to wait for FinalizedFeatureCache update to finish.", e) + } + + if (!success) { + throw new TimeoutException( + s"Timed out after waiting for ${waitTimeMs}ms for FeatureCache to be updated.") + } + }) + } + } + + /** + * A shutdownable thread to process feature node change notifications that are populated into the + * queue. If any change notification can not be processed successfully (unless it is due to an + * interrupt), the thread treats it as a fatal event and triggers Broker exit. + * + * @param name name of the thread + */ + private class ChangeNotificationProcessorThread(name: String) extends ShutdownableThread(name = name) { + override def doWork(): Unit = { + try { + queue.take.updateLatestOrThrow() + } catch { + case e: InterruptedException => info(s"Interrupted", e) + case e: Exception => { + error("Failed to process feature ZK node change event. The broker will exit.", e) + throw new FatalExitError(1) + } + } + } + } + + // Feature ZK node change handler. + object FeatureZNodeChangeHandler extends ZNodeChangeHandler { + override val path: String = FeatureZNode.path + + private def processNotification(): Unit = { + queue.add(new FeatureCacheUpdater(path)) + } + + override def handleCreation(): Unit = { + info(s"Feature ZK node created at path: $path") + processNotification() + } + + override def handleDataChange(): Unit = { + info(s"Feature ZK node updated at path: $path") + processNotification() + } + + override def handleDeletion(): Unit = { + warn(s"Feature ZK node deleted at path: $path") + // This event may happen, rarely (ex: operational error). + // In such a case, we prefer to just log a warning and treat the case as if the node is absent, + // and populate the FinalizedFeatureCache with empty finalized features. + processNotification() + } + } + + private val queue = new LinkedBlockingQueue[FeatureCacheUpdater] + + private val thread = new ChangeNotificationProcessorThread("feature-zk-node-event-process-thread") + + /** + * This method initializes the feature ZK node change listener. Optionally, it also ensures to + * update the FinalizedFeatureCache once with the latest contents of the feature ZK node + * (if the node exists). This step helps ensure that feature incompatibilities (if any) in brokers + * are conveniently detected before the initOrThrow() method returns to the caller. If feature + * incompatibilities are detected, this method will throw an Exception to the caller, and the Broker + * would exit eventually. + * + * @param waitOnceForCacheUpdateMs # of milli seconds to wait for feature cache to be updated once. + * If this parameter <= 0, no wait operation happens. + * + * @throws Exception if feature incompatibility check could not be finished in a timely manner + */ + def initOrThrow(waitOnceForCacheUpdateMs: Long): Unit = { + thread.start() + zkClient.registerZNodeChangeHandlerAndCheckExistence(FeatureZNodeChangeHandler) + + if (waitOnceForCacheUpdateMs > 0) { + val barrier = new CountDownLatch(1) + val ensureCacheUpdateOnce = new FeatureCacheUpdater(FeatureZNodeChangeHandler.path, Some(barrier)) + queue.add(ensureCacheUpdateOnce) + try { + ensureCacheUpdateOnce.awaitUpdateOrThrow(waitOnceForCacheUpdateMs) + } catch { + case e: Exception => { + close() + throw e + } + } + } + } + + /** + * Closes the feature ZK node change listener by unregistering the listener from ZK client, + * clearing the queue and shutting down the ChangeNotificationProcessorThread. + */ + def close(): Unit = { + zkClient.unregisterZNodeChangeHandler(FeatureZNodeChangeHandler.path) Review comment: The order probably doesn't matter in this case. But logically I decided to follow the below order since I could reason about it better: 1. Stop the inflow of new events 2. Clear pending events 3. Stop the processing of all events ########## File path: core/src/main/scala/kafka/server/FinalizedFeatureChangeListener.scala ########## @@ -0,0 +1,203 @@ +package kafka.server + +import java.util.concurrent.{CountDownLatch, LinkedBlockingQueue, TimeUnit} + +import kafka.utils.{Logging, ShutdownableThread} +import kafka.zk.{FeatureZNode,FeatureZNodeStatus, KafkaZkClient, ZkVersion} +import kafka.zookeeper.ZNodeChangeHandler +import org.apache.kafka.common.internals.FatalExitError + +import scala.concurrent.TimeoutException + +/** + * Listens to changes in the ZK feature node, via the ZK client. Whenever a change notification + * is received from ZK, the feature cache in FinalizedFeatureCache is asynchronously updated + * to the latest features read from ZK. The cache updates are serialized through a single + * notification processor thread. + * + * @param zkClient the Zookeeper client + */ +class FinalizedFeatureChangeListener(zkClient: KafkaZkClient) extends Logging { + + /** + * Helper class used to update the FinalizedFeatureCache. + * + * @param featureZkNodePath the path to the ZK feature node to be read + * @param maybeNotifyOnce an optional latch that can be used to propvide notification + * when an update operation is over + */ + private class FeatureCacheUpdater(featureZkNodePath: String, maybeNotifyOnce: Option[CountDownLatch]) { + + def this(featureZkNodePath: String) = this(featureZkNodePath, Option.empty) + + /** + * Updates the feature cache in FinalizedFeatureCache with the latest features read from the + * ZK node in featureZkNodePath. If the cache update is not successful, then, a suitable + * exception is raised. + * + * NOTE: if a notifier was provided in the constructor, then, this method can be invoked + * only exactly once successfully. + */ + def updateLatestOrThrow(): Unit = { + maybeNotifyOnce.foreach(notifier => { + if (notifier.getCount != 1) { + throw new IllegalStateException( + "Can not notify after updateLatestOrThrow was called more than once successfully.") + } + }) + + info(s"Reading feature ZK node at path: $featureZkNodePath") + val (mayBeFeatureZNodeBytes, version) = zkClient.getDataAndVersion(featureZkNodePath) + if (version == ZkVersion.UnknownVersion) { + info(s"Feature ZK node at path: $featureZkNodePath does not exist") + FinalizedFeatureCache.clear() + } else { + val featureZNode = FeatureZNode.decode(mayBeFeatureZNodeBytes.get) + if (featureZNode.status == FeatureZNodeStatus.Disabled) { + info(s"Feature ZK node at path: $featureZkNodePath is in disabled status") + FinalizedFeatureCache.clear() + } else if(featureZNode.status == FeatureZNodeStatus.Enabled) { + FinalizedFeatureCache.updateOrThrow(featureZNode.features, version) + } else { + throw new IllegalStateException(s"Unexpected FeatureZNodeStatus found in $featureZNode") + } + } + + maybeNotifyOnce.foreach(notifier => notifier.countDown()) + } + + /** + * Waits until at least a single updateLatestOrThrow completes successfully. + * NOTE: The method returns immediately if an updateLatestOrThrow call has already completed + * successfully. + * + * @param waitTimeMs the timeout for the wait operation + * + * @throws - RuntimeException if the thread was interrupted during wait + * - TimeoutException if the wait can not be completed in waitTimeMs + * milli seconds + */ + def awaitUpdateOrThrow(waitTimeMs: Long): Unit = { + maybeNotifyOnce.foreach(notifier => { + var success = false + try { + success = notifier.await(waitTimeMs, TimeUnit.MILLISECONDS) + } catch { + case e: InterruptedException => + throw new RuntimeException( + "Unable to wait for FinalizedFeatureCache update to finish.", e) + } + + if (!success) { + throw new TimeoutException( + s"Timed out after waiting for ${waitTimeMs}ms for FeatureCache to be updated.") + } + }) + } + } + + /** + * A shutdownable thread to process feature node change notifications that are populated into the + * queue. If any change notification can not be processed successfully (unless it is due to an + * interrupt), the thread treats it as a fatal event and triggers Broker exit. + * + * @param name name of the thread + */ + private class ChangeNotificationProcessorThread(name: String) extends ShutdownableThread(name = name) { + override def doWork(): Unit = { + try { + queue.take.updateLatestOrThrow() + } catch { + case e: InterruptedException => info(s"Interrupted", e) + case e: Exception => { + error("Failed to process feature ZK node change event. The broker will exit.", e) + throw new FatalExitError(1) + } + } + } + } + + // Feature ZK node change handler. + object FeatureZNodeChangeHandler extends ZNodeChangeHandler { + override val path: String = FeatureZNode.path + + private def processNotification(): Unit = { Review comment: Done. Removed. ########## File path: core/src/main/scala/kafka/server/FinalizedFeatureChangeListener.scala ########## @@ -0,0 +1,203 @@ +package kafka.server + +import java.util.concurrent.{CountDownLatch, LinkedBlockingQueue, TimeUnit} + +import kafka.utils.{Logging, ShutdownableThread} +import kafka.zk.{FeatureZNode,FeatureZNodeStatus, KafkaZkClient, ZkVersion} +import kafka.zookeeper.ZNodeChangeHandler +import org.apache.kafka.common.internals.FatalExitError + +import scala.concurrent.TimeoutException + +/** + * Listens to changes in the ZK feature node, via the ZK client. Whenever a change notification + * is received from ZK, the feature cache in FinalizedFeatureCache is asynchronously updated + * to the latest features read from ZK. The cache updates are serialized through a single + * notification processor thread. + * + * @param zkClient the Zookeeper client + */ +class FinalizedFeatureChangeListener(zkClient: KafkaZkClient) extends Logging { + + /** + * Helper class used to update the FinalizedFeatureCache. + * + * @param featureZkNodePath the path to the ZK feature node to be read + * @param maybeNotifyOnce an optional latch that can be used to propvide notification + * when an update operation is over + */ + private class FeatureCacheUpdater(featureZkNodePath: String, maybeNotifyOnce: Option[CountDownLatch]) { + + def this(featureZkNodePath: String) = this(featureZkNodePath, Option.empty) + + /** + * Updates the feature cache in FinalizedFeatureCache with the latest features read from the + * ZK node in featureZkNodePath. If the cache update is not successful, then, a suitable + * exception is raised. + * + * NOTE: if a notifier was provided in the constructor, then, this method can be invoked + * only exactly once successfully. + */ + def updateLatestOrThrow(): Unit = { + maybeNotifyOnce.foreach(notifier => { + if (notifier.getCount != 1) { + throw new IllegalStateException( + "Can not notify after updateLatestOrThrow was called more than once successfully.") + } + }) + + info(s"Reading feature ZK node at path: $featureZkNodePath") + val (mayBeFeatureZNodeBytes, version) = zkClient.getDataAndVersion(featureZkNodePath) + if (version == ZkVersion.UnknownVersion) { Review comment: I have added documentation here in this method describing all the cases. The empty data case should never happen and can indicate a corruption. The reason is that we always return non-empty data in `FeatureZNode.encode`, so the ZK node content should never empty. Yes, I can add some more info to KAFKA-10028 or in the write path PR summary. ########## File path: core/src/main/scala/kafka/server/FinalizedFeatureCache.scala ########## @@ -0,0 +1,93 @@ +package kafka.server + +import kafka.utils.Logging +import org.apache.kafka.common.feature.{Features, VersionLevelRange} + +// Raised whenever there was an error in updating the FinalizedFeatureCache with features. +class FeatureCacheUpdateException(message: String) extends RuntimeException(message) { +} + +// Helper class that represents finalized features along with an epoch value. +case class FinalizedFeaturesAndEpoch(features: Features[VersionLevelRange], epoch: Int) { + + def isValid(newEpoch: Int): Boolean = { + newEpoch >= epoch + } + + override def toString(): String = { + "FinalizedFeaturesAndEpoch(features=%s, epoch=%s)".format(features, epoch) + } +} + +/** + * A mutable cache containing the latest finalized features and epoch. This cache is populated by a + * FinalizedFeatureChangeListener. + * + * Currently the main reader of this cache is the read path that serves an ApiVersionsRequest + * returning the features information in the response. In the future, as the feature versioning + * system in KIP-584 is used more widely, this cache could be read by other read paths trying to + * learn the finalized feature information. + */ +object FinalizedFeatureCache extends Logging { + @volatile private var featuresAndEpoch: Option[FinalizedFeaturesAndEpoch] = Option.empty + + /** + * @return the latest known FinalizedFeaturesAndEpoch. If the returned value is empty, it means + * no FinalizedFeaturesAndEpoch exists in the cache at the time when this + * method is invoked. This result could change in the future whenever the + * updateOrThrow method is invoked. + */ + def get: Option[FinalizedFeaturesAndEpoch] = { + featuresAndEpoch + } + + def empty: Boolean = { + featuresAndEpoch.isEmpty + } + + /** + * Clears all existing finalized features and epoch from the cache. + */ + def clear(): Unit = { + featuresAndEpoch = Option.empty + info("Cleared cache") + } + + /** + * Updates the cache to the latestFeatures, and updates the existing epoch to latestEpoch. + * Raises an exception when the operation is not successful. + * + * @param latestFeatures the latest finalized features to be set in the cache + * @param latestEpoch the latest epoch value to be set in the cache + * + * @throws FeatureCacheUpdateException if the cache update operation fails + * due to invalid parameters or incompatibilities with the broker's + * supported features. In such a case, the existing cache contents are + * not modified. + */ + def updateOrThrow(latestFeatures: Features[VersionLevelRange], latestEpoch: Int): Unit = { + updateOrThrow(FinalizedFeaturesAndEpoch(latestFeatures, latestEpoch)) + } + + private def updateOrThrow(latest: FinalizedFeaturesAndEpoch): Unit = { + val existingStr = featuresAndEpoch.map(existing => existing.toString).getOrElse("<empty>") + if (!featuresAndEpoch.isEmpty && featuresAndEpoch.get.epoch > latest.epoch) { + val errorMsg = ("FinalizedFeatureCache update failed due to invalid epoch in new finalized %s." + + " The existing finalized is %s").format(latest, existingStr) + throw new FeatureCacheUpdateException(errorMsg) + } else { + val incompatibleFeatures = SupportedFeatures.incompatibleFeatures(latest.features) + if (incompatibleFeatures.nonEmpty) { + val errorMsg = ("FinalizedFeatureCache updated failed since feature compatibility" + + " checks failed! Supported %s has incompatibilities with the latest finalized %s." + + " The incompatible features are: %s.").format( + SupportedFeatures.get, latest, incompatibleFeatures) + throw new FeatureCacheUpdateException(errorMsg) + } + } + val logMsg = "Updated cache from existing finalized %s to latest finalized %s".format( Review comment: Done. Good point! ########## File path: core/src/test/scala/unit/kafka/server/FinalizedFeatureChangeListenerTest.scala ########## @@ -0,0 +1,228 @@ +package kafka.server + +import kafka.zk.{FeatureZNode, FeatureZNodeStatus, ZkVersion, ZooKeeperTestHarness} +import kafka.utils.{Exit, TestUtils} +import org.apache.kafka.common.feature.{Features, FinalizedVersionRange, SupportedVersionRange} +import org.apache.kafka.common.internals.FatalExitError +import org.junit.Assert.{assertEquals, assertFalse, assertNotEquals, assertThrows, assertTrue} +import org.junit.{Before, Test} + +import scala.concurrent.TimeoutException +import scala.jdk.CollectionConverters._ + +class FinalizedFeatureChangeListenerTest extends ZooKeeperTestHarness { + @Before + override def setUp(): Unit = { + super.setUp() + FinalizedFeatureCache.clear() + SupportedFeatures.clear() + } + + /** + * Tests that the listener can be initialized, and that it can listen to ZK notifications + * successfully from an "Enabled" FeatureZNode (the ZK data has no feature incompatibilities). + */ + @Test + def testInitSuccessAndNotificationSuccess(): Unit = { + val supportedFeatures = Map[String, SupportedVersionRange]( + "feature_1" -> new SupportedVersionRange(1, 4), + "feature_2" -> new SupportedVersionRange(1, 3)) + SupportedFeatures.update(Features.supportedFeatures(supportedFeatures.asJava)) + + val initialFinalizedFeaturesMap = Map[String, FinalizedVersionRange]( + "feature_1" -> new FinalizedVersionRange(2, 3)) + val initialFinalizedFeatures = Features.finalizedFeatures(initialFinalizedFeaturesMap.asJava) + zkClient.createFeatureZNode(FeatureZNode(FeatureZNodeStatus.Enabled, initialFinalizedFeatures)) + val (mayBeFeatureZNodeBytes, initialVersion) = zkClient.getDataAndVersion(FeatureZNode.path) + assertNotEquals(initialVersion, ZkVersion.UnknownVersion) + assertFalse(mayBeFeatureZNodeBytes.isEmpty) + + val listener = new FinalizedFeatureChangeListener(zkClient) + assertFalse(listener.isListenerInitiated) + assertTrue(FinalizedFeatureCache.empty) + listener.initOrThrow(15000) + assertTrue(listener.isListenerInitiated) + val mayBeNewCacheContent = FinalizedFeatureCache.get + assertFalse(mayBeNewCacheContent.isEmpty) + val newCacheContent = mayBeNewCacheContent.get + assertEquals(initialFinalizedFeatures, newCacheContent.features) + assertEquals(initialVersion, newCacheContent.epoch) + + val updatedFinalizedFeaturesMap = Map[String, FinalizedVersionRange]( + "feature_1" -> new FinalizedVersionRange(2, 4)) + val updatedFinalizedFeatures = Features.finalizedFeatures(updatedFinalizedFeaturesMap.asJava) + zkClient.updateFeatureZNode(FeatureZNode(FeatureZNodeStatus.Enabled, updatedFinalizedFeatures)) + val (mayBeFeatureZNodeNewBytes, updatedVersion) = zkClient.getDataAndVersion(FeatureZNode.path) + assertNotEquals(updatedVersion, ZkVersion.UnknownVersion) + assertFalse(mayBeFeatureZNodeNewBytes.isEmpty) + assertTrue(updatedVersion > initialVersion) + TestUtils.waitUntilTrue(() => { + FinalizedFeatureCache.get.get.equals(FinalizedFeaturesAndEpoch(updatedFinalizedFeatures, updatedVersion)) + }, "Timed out waiting for FinalizedFeatureCache to be updated with new features") + assertTrue(listener.isListenerInitiated) + } + + /** + * Tests that the listener can be initialized, and that it can process FeatureZNode deletion + * successfully. + */ + @Test + def testFeatureZNodeDeleteNotificationProcessing(): Unit = { + val supportedFeatures = Map[String, SupportedVersionRange]( + "feature_1" -> new SupportedVersionRange(1, 4), + "feature_2" -> new SupportedVersionRange(1, 3)) + SupportedFeatures.update(Features.supportedFeatures(supportedFeatures.asJava)) + + val initialFinalizedFeaturesMap = Map[String, FinalizedVersionRange]( + "feature_1" -> new FinalizedVersionRange(2, 3)) + val initialFinalizedFeatures = Features.finalizedFeatures(initialFinalizedFeaturesMap.asJava) + zkClient.createFeatureZNode(FeatureZNode(FeatureZNodeStatus.Enabled, initialFinalizedFeatures)) + val (mayBeFeatureZNodeBytes, initialVersion) = zkClient.getDataAndVersion(FeatureZNode.path) + assertNotEquals(initialVersion, ZkVersion.UnknownVersion) + assertFalse(mayBeFeatureZNodeBytes.isEmpty) + + val listener = new FinalizedFeatureChangeListener(zkClient) + assertFalse(listener.isListenerInitiated) + assertTrue(FinalizedFeatureCache.empty) + listener.initOrThrow(15000) + assertTrue(listener.isListenerInitiated) + val mayBeNewCacheContent = FinalizedFeatureCache.get + assertFalse(mayBeNewCacheContent.isEmpty) + val newCacheContent = mayBeNewCacheContent.get + assertEquals(initialFinalizedFeatures, newCacheContent.features) + assertEquals(initialVersion, newCacheContent.epoch) + + zkClient.deleteFeatureZNode() Review comment: No. But we want to test the behavior about what happens during a deletion (ex: operational error). ########## File path: clients/src/test/java/org/apache/kafka/common/feature/FinalizedVersionRangeTest.java ########## @@ -0,0 +1,135 @@ +package org.apache.kafka.common.feature; + +import java.util.Map; + +import org.junit.Test; + +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class FinalizedVersionRangeTest { Review comment: Done. I have simplified this test suite eliminating the redundant tests, and only keeping the ones specific to `FinalizedVersionRange`. Also I have added documentation to both test suites explaining their purpose. ########## File path: clients/src/test/java/org/apache/kafka/common/feature/FinalizedVersionRangeTest.java ########## @@ -0,0 +1,135 @@ +package org.apache.kafka.common.feature; Review comment: Done. ########## File path: clients/src/main/java/org/apache/kafka/common/feature/FinalizedVersionRange.java ########## @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.feature; + +import java.util.Map; + +/** + * A specialization of {@link BaseVersionRange} representing a range of version levels. + * NOTE: This is the backing class used to define the min/max version levels for finalized features. + */ +public class FinalizedVersionRange extends BaseVersionRange { + // Label for the min version key, that's used only for serialization/deserialization purposes. + private static final String MIN_VERSION_LEVEL_KEY_LABEL = "min_version_level"; + + // Label for the max version key, that's used only for serialization/deserialization purposes. + private static final String MAX_VERSION_LEVEL_KEY_LABEL = "max_version_level"; + + public FinalizedVersionRange(long minVersionLevel, long maxVersionLevel) { + super(MIN_VERSION_LEVEL_KEY_LABEL, minVersionLevel, MAX_VERSION_LEVEL_KEY_LABEL, maxVersionLevel); + } + + public static FinalizedVersionRange deserialize(Map<String, Long> serialized) { + return new FinalizedVersionRange( + BaseVersionRange.valueOrThrow(MIN_VERSION_LEVEL_KEY_LABEL, serialized), + BaseVersionRange.valueOrThrow(MAX_VERSION_LEVEL_KEY_LABEL, serialized)); + } + + private boolean isCompatibleWith(BaseVersionRange versionRange) { + return min() >= versionRange.min() && max() <= versionRange.max(); Review comment: Done. Good point! ########## File path: core/src/main/scala/kafka/server/FinalizedFeatureCache.scala ########## @@ -0,0 +1,88 @@ +package kafka.server + +import kafka.utils.Logging +import org.apache.kafka.common.feature.{Features, FinalizedVersionRange} + +// Raised whenever there was an error in updating the FinalizedFeatureCache with features. +class FeatureCacheUpdateException(message: String) extends RuntimeException(message) { +} + +// Helper class that represents finalized features along with an epoch value. +case class FinalizedFeaturesAndEpoch(features: Features[FinalizedVersionRange], epoch: Int) { + override def toString(): String = { + "FinalizedFeaturesAndEpoch(features=%s, epoch=%s)".format(features, epoch) + } +} + +/** + * A mutable cache containing the latest finalized features and epoch. This cache is populated by a + * {@link FinalizedFeatureChangeListener}. + * + * Currently the main reader of this cache is the read path that serves an ApiVersionsRequest Review comment: Done. ########## File path: core/src/main/scala/kafka/server/SupportedFeatures.scala ########## @@ -0,0 +1,74 @@ +package kafka.server + +import kafka.utils.Logging +import org.apache.kafka.common.feature.{Features, FinalizedVersionRange, SupportedVersionRange} +import org.apache.kafka.common.feature.Features._ + +import scala.jdk.CollectionConverters._ + +/** + * A common object used in the Broker to define the latest features supported by the Broker. + * Also provides API to check for incompatibilities between the latest features supported by the + * Broker and cluster-wide finalized features. + */ +object SupportedFeatures extends Logging { + + /** + * This is the latest features supported by the Broker. + * This is currently empty, but in the future as we define supported features, this map should be + * populated. + */ + @volatile private var supportedFeatures = emptySupportedFeatures + + /** + * Returns the latest features supported by the Broker. + */ + def get: Features[SupportedVersionRange] = { + supportedFeatures + } + + // Should be used only for testing. + def update(newFeatures: Features[SupportedVersionRange]): Unit = { + supportedFeatures = newFeatures + } + + // Should be used only for testing. + def clear(): Unit = { + supportedFeatures = emptySupportedFeatures + } + + /** + * Returns the set of feature names found to be 'incompatible'. + * A feature incompatibility is a version mismatch between the latest feature supported by the + * Broker, and the provided cluster-wide finalized feature. This can happen because a provided + * cluster-wide finalized feature: + * 1) Does not exist in the Broker (i.e. it is unknown to the Broker). + * [OR] + * 2) Exists but the FinalizedVersionRange does not match with the supported feature's SupportedVersionRange. + * + * @param finalized The finalized features against which incompatibilities need to be checked for. + * + * @return The set of incompatible feature names. If the returned set is empty, it + * means there were no feature incompatibilities found. + */ + def incompatibleFeatures(finalized: Features[FinalizedVersionRange]): Set[String] = { + val incompatibilities = finalized.features.asScala.collect { + case (feature, versionLevels) => { + val supportedVersions = supportedFeatures.get(feature); + if (supportedVersions == null) { + (feature, "{feature=%s, reason='Unsupported feature'}".format(feature)) + } else if (versionLevels.isIncompatibleWith(supportedVersions)) { + (feature, "{feature=%s, reason='Finalized %s is incompatible with supported %s'}".format( + feature, versionLevels, supportedVersions)) + } else { + (feature, null) + } + } + }.filter(entry => entry._2 != null) + + if (incompatibilities.nonEmpty) { Review comment: Done. Removed extra logging in the caller of this method (see `FinalizedFeatureCache`). ########## File path: clients/src/main/java/org/apache/kafka/common/feature/Features.java ########## @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.feature; + +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.Objects; + +import static java.util.stream.Collectors.joining; + +/** + * Represents an immutable dictionary with key being feature name, and value being VersionRangeType. + * Also provides API to serialize/deserialize the features and their version ranges to/from a map. + * + * This class can be instantiated only using its factory functions, with the important ones being: + * Features.supportedFeatures(...) and Features.finalizedFeatures(...). + * + * @param <VersionRangeType> is the type of version range. + * @see SupportedVersionRange + * @see FinalizedVersionRange + */ +public class Features<VersionRangeType extends BaseVersionRange> { + private final Map<String, VersionRangeType> features; + + /** + * Constructor is made private, as for readability it is preferred the caller uses one of the + * static factory functions for instantiation (see below). + * + * @param features Map of feature name to type of VersionRange, as the backing data structure + * for the Features object. + */ + private Features(Map<String, VersionRangeType> features) { + this.features = features; + } + + /** + * @param features Map of feature name to VersionRange, as the backing data structure + * for the Features object. + * @return Returns a new Features object representing "supported" features. + */ + public static Features<SupportedVersionRange> supportedFeatures(Map<String, SupportedVersionRange> features) { + return new Features<>(features); + } + + /** + * @param features Map of feature name to FinalizedVersionRange, as the backing data structure + * for the Features object. + * @return Returns a new Features object representing "finalized" features. + */ + public static Features<FinalizedVersionRange> finalizedFeatures(Map<String, FinalizedVersionRange> features) { + return new Features<>(features); + } + + // Visible for testing. + public static Features<FinalizedVersionRange> emptyFinalizedFeatures() { + return new Features<>(new HashMap<>()); + } + + public static Features<SupportedVersionRange> emptySupportedFeatures() { + return new Features<>(new HashMap<>()); + } + + public Map<String, VersionRangeType> features() { + return features; + } + + public boolean empty() { + return features.isEmpty(); + } + + /** + * @param feature name of the feature + * + * @return the VersionRangeType corresponding to the feature name, or null if absent + */ + public VersionRangeType get(String feature) { + return features.get(feature); + } + + public String toString() { + return String.format( + "Features{%s}", + features + .entrySet() + .stream() + .map(entry -> String.format("(%s -> %s)", entry.getKey(), entry.getValue())) + .collect(joining(", ")) + ); + } + + /** + * @return A map with underlying features serialized. The returned value can be deserialized + * using one of the deserialize* APIs. + */ + public Map<String, Map<String, Long>> serialize() { + return features.entrySet().stream().collect( + Collectors.toMap( + Map.Entry::getKey, + entry -> entry.getValue().serialize())); + } + + /** + * Deserialize a map to Features<FinalizedVersionRange>. + * + * @param serialized the serialized representation of a Features<FinalizedVersionRange> object, + * generated using the serialize() API. + * + * @return the deserialized Features<FinalizedVersionRange> object + */ + public static Features<FinalizedVersionRange> deserializeFinalizedFeatures( + Map<String, Map<String, Long>> serialized) { + return finalizedFeatures(serialized.entrySet().stream().collect( + Collectors.toMap( + Map.Entry::getKey, + entry -> FinalizedVersionRange.deserialize(entry.getValue())))); + } + + /** + * Deserializes a map to Features<SupportedVersionRange>. + * + * @param serialized the serialized representation of a Features<SupportedVersionRange> object, + * generated using the serialize() API. + * + * @return the deserialized Features<SupportedVersionRange> object + */ + public static Features<SupportedVersionRange> deserializeSupportedFeatures( Review comment: Done. ########## File path: core/src/main/scala/kafka/zk/ZkData.scala ########## @@ -744,6 +782,90 @@ object DelegationTokenInfoZNode { def decode(bytes: Array[Byte]): Option[TokenInformation] = DelegationTokenManager.fromBytes(bytes) } +object FeatureZNodeStatus extends Enumeration { + val Disabled, Enabled = Value + + def withNameOpt(value: Int): Option[Value] = { + values.find(_.id == value) + } +} + +case class FeatureZNode(status: FeatureZNodeStatus.Value, features: Features[FinalizedVersionRange]) { +} + +object FeatureZNode { + private val VersionKey = "version" + private val StatusKey = "status" + private val FeaturesKey = "features" + + // Version0 contains 'version', 'status' and 'features' keys. + val Version0 = 0 Review comment: Done. ########## File path: core/src/main/scala/kafka/zk/ZkData.scala ########## @@ -744,6 +782,90 @@ object DelegationTokenInfoZNode { def decode(bytes: Array[Byte]): Option[TokenInformation] = DelegationTokenManager.fromBytes(bytes) } +object FeatureZNodeStatus extends Enumeration { + val Disabled, Enabled = Value + + def withNameOpt(value: Int): Option[Value] = { + values.find(_.id == value) + } +} + +case class FeatureZNode(status: FeatureZNodeStatus.Value, features: Features[FinalizedVersionRange]) { +} + +object FeatureZNode { + private val VersionKey = "version" + private val StatusKey = "status" + private val FeaturesKey = "features" + + // Version0 contains 'version', 'status' and 'features' keys. + val Version0 = 0 + val CurrentVersion = Version0 + + def path = "/feature" + + def asJavaMap(scalaMap: Map[String, Map[String, Long]]): util.Map[String, util.Map[String, java.lang.Long]] = { + scalaMap + .view.mapValues(_.view.mapValues(scalaLong => java.lang.Long.valueOf(scalaLong)).toMap.asJava) + .toMap + .asJava + } + + def encode(featureZNode: FeatureZNode): Array[Byte] = { + val jsonMap = collection.mutable.Map( + VersionKey -> CurrentVersion, + StatusKey -> featureZNode.status.id, + FeaturesKey -> featureZNode.features.serialize) + Json.encodeAsBytes(jsonMap.asJava) + } + + def decode(jsonBytes: Array[Byte]): FeatureZNode = { + Json.tryParseBytes(jsonBytes) match { + case Right(js) => + val featureInfo = js.asJsonObject + val version = featureInfo(VersionKey).to[Int] + if (version < Version0 || version > CurrentVersion) { + throw new KafkaException(s"Unsupported version: $version of feature information: " + + s"${new String(jsonBytes, UTF_8)}") + } + + val featuresMap = featureInfo + .get(FeaturesKey) + .flatMap(_.to[Option[Map[String, Map[String, Long]]]]) + if (featuresMap.isEmpty) { + throw new KafkaException("Features map can not be absent in: " + + s"${new String(jsonBytes, UTF_8)}") + } + val features = asJavaMap(featuresMap.get) + + val statusInt = featureInfo + .get(StatusKey) + .flatMap(_.to[Option[Int]]) + if (statusInt.isEmpty) { + throw new KafkaException("Status can not be absent in feature information: " + + s"${new String(jsonBytes, UTF_8)}") + } + val status = FeatureZNodeStatus.withNameOpt(statusInt.get) + if (status.isEmpty) { Review comment: Done. For the other point, I don't feel strongly for it. I feel it is OK to have an API that doesn't throw and just lets the caller decide (based on the context) if an empty returned value is incorrect. ########## File path: core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala ########## @@ -776,6 +776,9 @@ class KafkaConfigTest { case KafkaConfig.KafkaMetricsReporterClassesProp => // ignore case KafkaConfig.KafkaMetricsPollingIntervalSecondsProp => //ignore + //Feature configuration Review comment: Done. ########## File path: core/src/main/scala/kafka/zk/KafkaZkClient.scala ########## @@ -1567,6 +1567,36 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo createRecursive(path, data = null, throwIfPathExists = false) } + // Visible for testing. + def createFeatureZNode(nodeContents: FeatureZNode): Unit = { Review comment: Done. ########## File path: core/src/main/scala/kafka/zk/ZkData.scala ########## @@ -81,17 +83,26 @@ object BrokerIdsZNode { object BrokerInfo { /** - * Create a broker info with v4 json format (which includes multiple endpoints and rack) if - * the apiVersion is 0.10.0.X or above. Register the broker with v2 json format otherwise. + * - Create a broker info with v5 json format if the apiVersion is 2.6.x or above. + * - Create a broker info with v4 json format (which includes multiple endpoints and rack) if + * the apiVersion is 0.10.0.X or above but lesser than 2.6.x. + * - Register the broker with v2 json format otherwise. * * Due to KAFKA-3100, 0.9.0.0 broker and old clients will break if JSON version is above 2. * - * We include v2 to make it possible for the broker to migrate from 0.9.0.0 to 0.10.0.X or above without having to - * upgrade to 0.9.0.1 first (clients have to be upgraded to 0.9.0.1 in any case). + * We include v2 to make it possible for the broker to migrate from 0.9.0.0 to 0.10.0.X or above + * without having to upgrade to 0.9.0.1 first (clients have to be upgraded to 0.9.0.1 in + * any case). */ def apply(broker: Broker, apiVersion: ApiVersion, jmxPort: Int): BrokerInfo = { - // see method documentation for the reason why we do this - val version = if (apiVersion >= KAFKA_0_10_0_IV1) 4 else 2 + val version = { + if (apiVersion >= KAFKA_0_10_0_IV1) Review comment: Done. Good catch! ########## File path: core/src/main/scala/kafka/zk/ZkData.scala ########## @@ -744,6 +782,90 @@ object DelegationTokenInfoZNode { def decode(bytes: Array[Byte]): Option[TokenInformation] = DelegationTokenManager.fromBytes(bytes) } +object FeatureZNodeStatus extends Enumeration { + val Disabled, Enabled = Value + + def withNameOpt(value: Int): Option[Value] = { + values.find(_.id == value) + } +} + +case class FeatureZNode(status: FeatureZNodeStatus.Value, features: Features[FinalizedVersionRange]) { +} + +object FeatureZNode { + private val VersionKey = "version" + private val StatusKey = "status" + private val FeaturesKey = "features" + + // Version0 contains 'version', 'status' and 'features' keys. + val Version0 = 0 + val CurrentVersion = Version0 + + def path = "/feature" + + def asJavaMap(scalaMap: Map[String, Map[String, Long]]): util.Map[String, util.Map[String, java.lang.Long]] = { + scalaMap + .view.mapValues(_.view.mapValues(scalaLong => java.lang.Long.valueOf(scalaLong)).toMap.asJava) + .toMap + .asJava + } + + def encode(featureZNode: FeatureZNode): Array[Byte] = { + val jsonMap = collection.mutable.Map( + VersionKey -> CurrentVersion, + StatusKey -> featureZNode.status.id, + FeaturesKey -> featureZNode.features.serialize) + Json.encodeAsBytes(jsonMap.asJava) + } + + def decode(jsonBytes: Array[Byte]): FeatureZNode = { + Json.tryParseBytes(jsonBytes) match { + case Right(js) => + val featureInfo = js.asJsonObject + val version = featureInfo(VersionKey).to[Int] + if (version < Version0 || version > CurrentVersion) { + throw new KafkaException(s"Unsupported version: $version of feature information: " + + s"${new String(jsonBytes, UTF_8)}") + } + + val featuresMap = featureInfo + .get(FeaturesKey) + .flatMap(_.to[Option[Map[String, Map[String, Long]]]]) + if (featuresMap.isEmpty) { + throw new KafkaException("Features map can not be absent in: " + + s"${new String(jsonBytes, UTF_8)}") + } + val features = asJavaMap(featuresMap.get) + + val statusInt = featureInfo + .get(StatusKey) + .flatMap(_.to[Option[Int]]) + if (statusInt.isEmpty) { + throw new KafkaException("Status can not be absent in feature information: " + Review comment: Done. Changed to `IllegalArgumentException`. Good point! ########## File path: core/src/main/scala/kafka/zk/ZkData.scala ########## @@ -744,6 +782,90 @@ object DelegationTokenInfoZNode { def decode(bytes: Array[Byte]): Option[TokenInformation] = DelegationTokenManager.fromBytes(bytes) } +object FeatureZNodeStatus extends Enumeration { + val Disabled, Enabled = Value + + def withNameOpt(value: Int): Option[Value] = { + values.find(_.id == value) + } +} + +case class FeatureZNode(status: FeatureZNodeStatus.Value, features: Features[FinalizedVersionRange]) { +} + +object FeatureZNode { Review comment: As far as I can see, no ZK node class defined in this file is defined in such a way. Every class in this file encodes/decodes JSON by itself, and manages its own attributes. Should we break the norm? ########## File path: core/src/main/scala/kafka/zk/ZkData.scala ########## @@ -146,7 +162,14 @@ object BrokerIdZNode { val plaintextEndpoint = broker.endPoints.find(_.securityProtocol == SecurityProtocol.PLAINTEXT).getOrElse( new EndPoint(null, -1, null, null)) encode(brokerInfo.version, plaintextEndpoint.host, plaintextEndpoint.port, broker.endPoints, brokerInfo.jmxPort, - broker.rack) + broker.rack, broker.features) + } + + def asJavaMap(brokerInfo: JsonObject): util.Map[String, util.Map[String, java.lang.Long]] = { Review comment: Done. ########## File path: core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala ########## @@ -811,8 +828,16 @@ class KafkaZkClientTest extends ZooKeeperTestHarness { assertEquals(Seq.empty, zkClient.getSortedBrokerList) assertEquals(None, zkClient.getBroker(0)) - val brokerInfo0 = createBrokerInfo(0, "test.host0", 9998, SecurityProtocol.PLAINTEXT) - val brokerInfo1 = createBrokerInfo(1, "test.host1", 9999, SecurityProtocol.SSL) + val brokerInfo0 = createBrokerInfo( Review comment: See L848 below where it is validated. The call to `zkClient. getAllBrokersInCluster` decodes each `BrokerIdZNode` content from JSON to `BrokerInfo` object. Then, we check whether the call returns exactly the same `BrokerInfo` objects defined here, and, along the way features are checked too. ########## File path: core/src/main/scala/kafka/server/SupportedFeatures.scala ########## @@ -0,0 +1,74 @@ +package kafka.server + +import kafka.utils.Logging +import org.apache.kafka.common.feature.{Features, FinalizedVersionRange, SupportedVersionRange} +import org.apache.kafka.common.feature.Features._ + +import scala.jdk.CollectionConverters._ + +/** + * A common object used in the Broker to define the latest features supported by the Broker. + * Also provides API to check for incompatibilities between the latest features supported by the + * Broker and cluster-wide finalized features. + */ +object SupportedFeatures extends Logging { + + /** + * This is the latest features supported by the Broker. + * This is currently empty, but in the future as we define supported features, this map should be + * populated. + */ + @volatile private var supportedFeatures = emptySupportedFeatures + + /** + * Returns the latest features supported by the Broker. Review comment: Done. ########## File path: core/src/main/scala/kafka/server/KafkaConfig.scala ########## @@ -812,6 +817,8 @@ object KafkaConfig { val ControlledShutdownMaxRetriesDoc = "Controlled shutdown can fail for multiple reasons. This determines the number of retries when such failure happens" val ControlledShutdownRetryBackoffMsDoc = "Before each retry, the system needs time to recover from the state that caused the previous failure (Controller fail over, replica lag etc). This config determines the amount of time to wait before retrying." val ControlledShutdownEnableDoc = "Enable controlled shutdown of the server" + /** ********* Feature configuration ***********/ + val FeatureChangeListenerCacheUpdateWaitTimeMsDoc = "# of milli seconds to wait for feature cache to be updated once." Review comment: Done. ########## File path: core/src/main/scala/kafka/server/FinalizedFeatureChangeListener.scala ########## @@ -0,0 +1,200 @@ +package kafka.server + +import java.util.concurrent.{CountDownLatch, LinkedBlockingQueue, TimeUnit} + +import kafka.utils.{Logging, ShutdownableThread} +import kafka.zk.{FeatureZNode,FeatureZNodeStatus, KafkaZkClient, ZkVersion} +import kafka.zookeeper.ZNodeChangeHandler +import org.apache.kafka.common.internals.FatalExitError + +import scala.concurrent.TimeoutException + +/** + * Listens to changes in the ZK feature node, via the ZK client. Whenever a change notification + * is received from ZK, the feature cache in FinalizedFeatureCache is asynchronously updated + * to the latest features read from ZK. The cache updates are serialized through a single + * notification processor thread. + * + * @param zkClient the Zookeeper client + */ +class FinalizedFeatureChangeListener(zkClient: KafkaZkClient) extends Logging { + + /** + * Helper class used to update the FinalizedFeatureCache. + * + * @param featureZkNodePath the path to the ZK feature node to be read + * @param maybeNotifyOnce an optional latch that can be used to propvide notification + * when an update operation is over + */ + private class FeatureCacheUpdater(featureZkNodePath: String, maybeNotifyOnce: Option[CountDownLatch]) { + + def this(featureZkNodePath: String) = this(featureZkNodePath, Option.empty) + + /** + * Updates the feature cache in FinalizedFeatureCache with the latest features read from the + * ZK node in featureZkNodePath. If the cache update is not successful, then, a suitable + * exception is raised. + * + * NOTE: if a notifier was provided in the constructor, then, this method can be invoked + * only exactly once successfully. + */ + def updateLatestOrThrow(): Unit = { + maybeNotifyOnce.foreach(notifier => { + if (notifier.getCount != 1) { + throw new IllegalStateException( + "Can not notify after updateLatestOrThrow was called more than once successfully.") + } + }) + + info(s"Reading feature ZK node at path: $featureZkNodePath") + val (mayBeFeatureZNodeBytes, version) = zkClient.getDataAndVersion(featureZkNodePath) + if (version == ZkVersion.UnknownVersion) { + info(s"Feature ZK node at path: $featureZkNodePath does not exist") + FinalizedFeatureCache.clear() + } else { + val featureZNode = FeatureZNode.decode(mayBeFeatureZNodeBytes.get) + if (featureZNode.status == FeatureZNodeStatus.Disabled) { + info(s"Feature ZK node at path: $featureZkNodePath is in disabled status") + FinalizedFeatureCache.clear() + } else if(featureZNode.status == FeatureZNodeStatus.Enabled) { + FinalizedFeatureCache.updateOrThrow(featureZNode.features, version) + } else { + throw new IllegalStateException(s"Unexpected FeatureZNodeStatus found in $featureZNode") + } + } + + maybeNotifyOnce.foreach(notifier => notifier.countDown()) + } + + /** + * Waits until at least a single updateLatestOrThrow completes successfully. + * NOTE: The method returns immediately if an updateLatestOrThrow call has already completed + * successfully. + * + * @param waitTimeMs the timeout for the wait operation + * + * @throws - RuntimeException if the thread was interrupted during wait + * - TimeoutException if the wait can not be completed in waitTimeMs + * milli seconds + */ + def awaitUpdateOrThrow(waitTimeMs: Long): Unit = { + maybeNotifyOnce.foreach(notifier => { + var success = false + try { + success = notifier.await(waitTimeMs, TimeUnit.MILLISECONDS) + } catch { + case e: InterruptedException => + throw new RuntimeException( + "Unable to wait for FinalizedFeatureCache update to finish.", e) + } + + if (!success) { + throw new TimeoutException( + s"Timed out after waiting for ${waitTimeMs}ms for FeatureCache to be updated.") + } + }) + } + } + + /** + * A shutdownable thread to process feature node change notifications that are populated into the + * queue. If any change notification can not be processed successfully (unless it is due to an + * interrupt), the thread treats it as a fatal event and triggers Broker exit. + * + * @param name name of the thread + */ + private class ChangeNotificationProcessorThread(name: String) extends ShutdownableThread(name = name) { + override def doWork(): Unit = { + try { + queue.take.updateLatestOrThrow() + } catch { + case e: InterruptedException => info(s"Interrupted", e) + case e: Exception => { + error("Failed to process feature ZK node change event. The broker will exit.", e) + throw new FatalExitError(1) + } + } + } + } + + // Feature ZK node change handler. + object FeatureZNodeChangeHandler extends ZNodeChangeHandler { + override val path: String = FeatureZNode.path + + private def processNotification(): Unit = { + queue.add(new FeatureCacheUpdater(path)) + } + + override def handleCreation(): Unit = { + info(s"Feature ZK node created at path: $path") + processNotification() + } + + override def handleDataChange(): Unit = { + info(s"Feature ZK node updated at path: $path") + processNotification() + } + + override def handleDeletion(): Unit = { + warn(s"Feature ZK node deleted at path: $path") + processNotification() Review comment: You bring up a good point. My main concern is availability. If we exit the Broker here, then, whenever the feature ZK node gets deleted (accidentally), it could crash all brokers in the fleet all at once leading to an availability problem. With regards to violating feature semantics, good point. I'm in 2 minds here, and perhaps we can also hear @hachikuji 's thoughts on this topic. ########## File path: core/src/main/scala/kafka/server/FinalizedFeatureChangeListener.scala ########## @@ -0,0 +1,203 @@ +package kafka.server + +import java.util.concurrent.{CountDownLatch, LinkedBlockingQueue, TimeUnit} + +import kafka.utils.{Logging, ShutdownableThread} +import kafka.zk.{FeatureZNode,FeatureZNodeStatus, KafkaZkClient, ZkVersion} +import kafka.zookeeper.ZNodeChangeHandler +import org.apache.kafka.common.internals.FatalExitError + +import scala.concurrent.TimeoutException + +/** + * Listens to changes in the ZK feature node, via the ZK client. Whenever a change notification + * is received from ZK, the feature cache in FinalizedFeatureCache is asynchronously updated + * to the latest features read from ZK. The cache updates are serialized through a single + * notification processor thread. + * + * @param zkClient the Zookeeper client + */ +class FinalizedFeatureChangeListener(zkClient: KafkaZkClient) extends Logging { + + /** + * Helper class used to update the FinalizedFeatureCache. + * + * @param featureZkNodePath the path to the ZK feature node to be read + * @param maybeNotifyOnce an optional latch that can be used to propvide notification + * when an update operation is over + */ + private class FeatureCacheUpdater(featureZkNodePath: String, maybeNotifyOnce: Option[CountDownLatch]) { + + def this(featureZkNodePath: String) = this(featureZkNodePath, Option.empty) + + /** + * Updates the feature cache in FinalizedFeatureCache with the latest features read from the + * ZK node in featureZkNodePath. If the cache update is not successful, then, a suitable + * exception is raised. + * + * NOTE: if a notifier was provided in the constructor, then, this method can be invoked + * only exactly once successfully. + */ + def updateLatestOrThrow(): Unit = { + maybeNotifyOnce.foreach(notifier => { + if (notifier.getCount != 1) { + throw new IllegalStateException( + "Can not notify after updateLatestOrThrow was called more than once successfully.") + } + }) + + info(s"Reading feature ZK node at path: $featureZkNodePath") + val (mayBeFeatureZNodeBytes, version) = zkClient.getDataAndVersion(featureZkNodePath) + if (version == ZkVersion.UnknownVersion) { + info(s"Feature ZK node at path: $featureZkNodePath does not exist") + FinalizedFeatureCache.clear() + } else { + val featureZNode = FeatureZNode.decode(mayBeFeatureZNodeBytes.get) + if (featureZNode.status == FeatureZNodeStatus.Disabled) { + info(s"Feature ZK node at path: $featureZkNodePath is in disabled status") + FinalizedFeatureCache.clear() + } else if(featureZNode.status == FeatureZNodeStatus.Enabled) { + FinalizedFeatureCache.updateOrThrow(featureZNode.features, version) + } else { + throw new IllegalStateException(s"Unexpected FeatureZNodeStatus found in $featureZNode") + } + } + + maybeNotifyOnce.foreach(notifier => notifier.countDown()) + } + + /** + * Waits until at least a single updateLatestOrThrow completes successfully. + * NOTE: The method returns immediately if an updateLatestOrThrow call has already completed + * successfully. + * + * @param waitTimeMs the timeout for the wait operation + * + * @throws - RuntimeException if the thread was interrupted during wait + * - TimeoutException if the wait can not be completed in waitTimeMs + * milli seconds + */ + def awaitUpdateOrThrow(waitTimeMs: Long): Unit = { + maybeNotifyOnce.foreach(notifier => { + var success = false + try { + success = notifier.await(waitTimeMs, TimeUnit.MILLISECONDS) + } catch { + case e: InterruptedException => + throw new RuntimeException( + "Unable to wait for FinalizedFeatureCache update to finish.", e) + } + + if (!success) { + throw new TimeoutException( + s"Timed out after waiting for ${waitTimeMs}ms for FeatureCache to be updated.") + } + }) + } + } + + /** + * A shutdownable thread to process feature node change notifications that are populated into the + * queue. If any change notification can not be processed successfully (unless it is due to an + * interrupt), the thread treats it as a fatal event and triggers Broker exit. + * + * @param name name of the thread + */ + private class ChangeNotificationProcessorThread(name: String) extends ShutdownableThread(name = name) { + override def doWork(): Unit = { + try { + queue.take.updateLatestOrThrow() + } catch { + case e: InterruptedException => info(s"Interrupted", e) + case e: Exception => { + error("Failed to process feature ZK node change event. The broker will exit.", e) + throw new FatalExitError(1) + } + } + } + } + + // Feature ZK node change handler. + object FeatureZNodeChangeHandler extends ZNodeChangeHandler { + override val path: String = FeatureZNode.path + + private def processNotification(): Unit = { + queue.add(new FeatureCacheUpdater(path)) + } + + override def handleCreation(): Unit = { + info(s"Feature ZK node created at path: $path") Review comment: I didn't understand the question. Are you saying the logging severity should be lower or higher? This is a rare case anyway as the feature node doesn't get created often, so, `info` logging seems fine to me. ########## File path: core/src/main/scala/kafka/server/FinalizedFeatureChangeListener.scala ########## @@ -0,0 +1,203 @@ +package kafka.server + +import java.util.concurrent.{CountDownLatch, LinkedBlockingQueue, TimeUnit} + +import kafka.utils.{Logging, ShutdownableThread} +import kafka.zk.{FeatureZNode,FeatureZNodeStatus, KafkaZkClient, ZkVersion} +import kafka.zookeeper.ZNodeChangeHandler +import org.apache.kafka.common.internals.FatalExitError + +import scala.concurrent.TimeoutException + +/** + * Listens to changes in the ZK feature node, via the ZK client. Whenever a change notification + * is received from ZK, the feature cache in FinalizedFeatureCache is asynchronously updated + * to the latest features read from ZK. The cache updates are serialized through a single + * notification processor thread. + * + * @param zkClient the Zookeeper client + */ +class FinalizedFeatureChangeListener(zkClient: KafkaZkClient) extends Logging { + + /** + * Helper class used to update the FinalizedFeatureCache. + * + * @param featureZkNodePath the path to the ZK feature node to be read + * @param maybeNotifyOnce an optional latch that can be used to propvide notification + * when an update operation is over + */ + private class FeatureCacheUpdater(featureZkNodePath: String, maybeNotifyOnce: Option[CountDownLatch]) { + + def this(featureZkNodePath: String) = this(featureZkNodePath, Option.empty) + + /** + * Updates the feature cache in FinalizedFeatureCache with the latest features read from the + * ZK node in featureZkNodePath. If the cache update is not successful, then, a suitable + * exception is raised. + * + * NOTE: if a notifier was provided in the constructor, then, this method can be invoked + * only exactly once successfully. + */ + def updateLatestOrThrow(): Unit = { Review comment: Done. Yes, I feel JSON deserialization should be treated as fatal. It should never happen, and, can indicate corruption. ########## File path: core/src/main/scala/kafka/server/FinalizedFeatureChangeListener.scala ########## @@ -0,0 +1,203 @@ +package kafka.server + +import java.util.concurrent.{CountDownLatch, LinkedBlockingQueue, TimeUnit} + +import kafka.utils.{Logging, ShutdownableThread} +import kafka.zk.{FeatureZNode,FeatureZNodeStatus, KafkaZkClient, ZkVersion} +import kafka.zookeeper.ZNodeChangeHandler +import org.apache.kafka.common.internals.FatalExitError + +import scala.concurrent.TimeoutException + +/** + * Listens to changes in the ZK feature node, via the ZK client. Whenever a change notification + * is received from ZK, the feature cache in FinalizedFeatureCache is asynchronously updated + * to the latest features read from ZK. The cache updates are serialized through a single + * notification processor thread. + * + * @param zkClient the Zookeeper client + */ +class FinalizedFeatureChangeListener(zkClient: KafkaZkClient) extends Logging { + + /** + * Helper class used to update the FinalizedFeatureCache. + * + * @param featureZkNodePath the path to the ZK feature node to be read + * @param maybeNotifyOnce an optional latch that can be used to propvide notification + * when an update operation is over + */ + private class FeatureCacheUpdater(featureZkNodePath: String, maybeNotifyOnce: Option[CountDownLatch]) { + + def this(featureZkNodePath: String) = this(featureZkNodePath, Option.empty) + + /** + * Updates the feature cache in FinalizedFeatureCache with the latest features read from the + * ZK node in featureZkNodePath. If the cache update is not successful, then, a suitable + * exception is raised. + * + * NOTE: if a notifier was provided in the constructor, then, this method can be invoked + * only exactly once successfully. + */ + def updateLatestOrThrow(): Unit = { + maybeNotifyOnce.foreach(notifier => { + if (notifier.getCount != 1) { + throw new IllegalStateException( + "Can not notify after updateLatestOrThrow was called more than once successfully.") + } + }) + + info(s"Reading feature ZK node at path: $featureZkNodePath") + val (mayBeFeatureZNodeBytes, version) = zkClient.getDataAndVersion(featureZkNodePath) + if (version == ZkVersion.UnknownVersion) { + info(s"Feature ZK node at path: $featureZkNodePath does not exist") + FinalizedFeatureCache.clear() + } else { + val featureZNode = FeatureZNode.decode(mayBeFeatureZNodeBytes.get) + if (featureZNode.status == FeatureZNodeStatus.Disabled) { + info(s"Feature ZK node at path: $featureZkNodePath is in disabled status") + FinalizedFeatureCache.clear() + } else if(featureZNode.status == FeatureZNodeStatus.Enabled) { + FinalizedFeatureCache.updateOrThrow(featureZNode.features, version) + } else { + throw new IllegalStateException(s"Unexpected FeatureZNodeStatus found in $featureZNode") + } + } + + maybeNotifyOnce.foreach(notifier => notifier.countDown()) + } + + /** + * Waits until at least a single updateLatestOrThrow completes successfully. + * NOTE: The method returns immediately if an updateLatestOrThrow call has already completed + * successfully. + * + * @param waitTimeMs the timeout for the wait operation + * + * @throws - RuntimeException if the thread was interrupted during wait + * - TimeoutException if the wait can not be completed in waitTimeMs + * milli seconds + */ + def awaitUpdateOrThrow(waitTimeMs: Long): Unit = { + maybeNotifyOnce.foreach(notifier => { + var success = false + try { + success = notifier.await(waitTimeMs, TimeUnit.MILLISECONDS) + } catch { + case e: InterruptedException => + throw new RuntimeException( + "Unable to wait for FinalizedFeatureCache update to finish.", e) + } + + if (!success) { + throw new TimeoutException( + s"Timed out after waiting for ${waitTimeMs}ms for FeatureCache to be updated.") + } + }) + } + } + + /** + * A shutdownable thread to process feature node change notifications that are populated into the + * queue. If any change notification can not be processed successfully (unless it is due to an + * interrupt), the thread treats it as a fatal event and triggers Broker exit. + * + * @param name name of the thread + */ + private class ChangeNotificationProcessorThread(name: String) extends ShutdownableThread(name = name) { + override def doWork(): Unit = { + try { + queue.take.updateLatestOrThrow() + } catch { + case e: InterruptedException => info(s"Interrupted", e) + case e: Exception => { + error("Failed to process feature ZK node change event. The broker will exit.", e) + throw new FatalExitError(1) + } + } + } + } + + // Feature ZK node change handler. + object FeatureZNodeChangeHandler extends ZNodeChangeHandler { + override val path: String = FeatureZNode.path + + private def processNotification(): Unit = { + queue.add(new FeatureCacheUpdater(path)) + } + + override def handleCreation(): Unit = { + info(s"Feature ZK node created at path: $path") + processNotification() + } + + override def handleDataChange(): Unit = { + info(s"Feature ZK node updated at path: $path") + processNotification() + } + + override def handleDeletion(): Unit = { + warn(s"Feature ZK node deleted at path: $path") + // This event may happen, rarely (ex: operational error). + // In such a case, we prefer to just log a warning and treat the case as if the node is absent, + // and populate the FinalizedFeatureCache with empty finalized features. + processNotification() + } + } + + private val queue = new LinkedBlockingQueue[FeatureCacheUpdater] + + private val thread = new ChangeNotificationProcessorThread("feature-zk-node-event-process-thread") + + /** + * This method initializes the feature ZK node change listener. Optionally, it also ensures to + * update the FinalizedFeatureCache once with the latest contents of the feature ZK node + * (if the node exists). This step helps ensure that feature incompatibilities (if any) in brokers + * are conveniently detected before the initOrThrow() method returns to the caller. If feature + * incompatibilities are detected, this method will throw an Exception to the caller, and the Broker + * would exit eventually. + * + * @param waitOnceForCacheUpdateMs # of milli seconds to wait for feature cache to be updated once. + * If this parameter <= 0, no wait operation happens. + * + * @throws Exception if feature incompatibility check could not be finished in a timely manner + */ + def initOrThrow(waitOnceForCacheUpdateMs: Long): Unit = { + thread.start() + zkClient.registerZNodeChangeHandlerAndCheckExistence(FeatureZNodeChangeHandler) Review comment: Yes. Here is the documentation explaining the same: https://zookeeper.apache.org/doc/r3.5.7/zookeeperProgrammers.html#Java+Binding. > When a ZooKeeper object is created, two threads are created as well: an IO thread and an event thread. All IO happens on the IO thread (using Java NIO). All event callbacks happen on the event thread. Session maintenance such as reconnecting to ZooKeeper servers and maintaining heartbeat is done on the IO thread. Responses for synchronous methods are also processed in the IO thread. All responses to asynchronous methods and watch events are processed on the event thread. ########## File path: core/src/main/scala/kafka/server/FinalizedFeatureChangeListener.scala ########## @@ -0,0 +1,203 @@ +package kafka.server + +import java.util.concurrent.{CountDownLatch, LinkedBlockingQueue, TimeUnit} + +import kafka.utils.{Logging, ShutdownableThread} +import kafka.zk.{FeatureZNode,FeatureZNodeStatus, KafkaZkClient, ZkVersion} +import kafka.zookeeper.ZNodeChangeHandler +import org.apache.kafka.common.internals.FatalExitError + +import scala.concurrent.TimeoutException + +/** + * Listens to changes in the ZK feature node, via the ZK client. Whenever a change notification + * is received from ZK, the feature cache in FinalizedFeatureCache is asynchronously updated + * to the latest features read from ZK. The cache updates are serialized through a single + * notification processor thread. + * + * @param zkClient the Zookeeper client + */ +class FinalizedFeatureChangeListener(zkClient: KafkaZkClient) extends Logging { + + /** + * Helper class used to update the FinalizedFeatureCache. + * + * @param featureZkNodePath the path to the ZK feature node to be read + * @param maybeNotifyOnce an optional latch that can be used to propvide notification + * when an update operation is over + */ + private class FeatureCacheUpdater(featureZkNodePath: String, maybeNotifyOnce: Option[CountDownLatch]) { + + def this(featureZkNodePath: String) = this(featureZkNodePath, Option.empty) + + /** + * Updates the feature cache in FinalizedFeatureCache with the latest features read from the + * ZK node in featureZkNodePath. If the cache update is not successful, then, a suitable + * exception is raised. + * + * NOTE: if a notifier was provided in the constructor, then, this method can be invoked + * only exactly once successfully. + */ + def updateLatestOrThrow(): Unit = { + maybeNotifyOnce.foreach(notifier => { + if (notifier.getCount != 1) { + throw new IllegalStateException( + "Can not notify after updateLatestOrThrow was called more than once successfully.") + } + }) + + info(s"Reading feature ZK node at path: $featureZkNodePath") + val (mayBeFeatureZNodeBytes, version) = zkClient.getDataAndVersion(featureZkNodePath) + if (version == ZkVersion.UnknownVersion) { + info(s"Feature ZK node at path: $featureZkNodePath does not exist") + FinalizedFeatureCache.clear() + } else { + val featureZNode = FeatureZNode.decode(mayBeFeatureZNodeBytes.get) + if (featureZNode.status == FeatureZNodeStatus.Disabled) { + info(s"Feature ZK node at path: $featureZkNodePath is in disabled status") + FinalizedFeatureCache.clear() + } else if(featureZNode.status == FeatureZNodeStatus.Enabled) { + FinalizedFeatureCache.updateOrThrow(featureZNode.features, version) + } else { + throw new IllegalStateException(s"Unexpected FeatureZNodeStatus found in $featureZNode") + } + } + + maybeNotifyOnce.foreach(notifier => notifier.countDown()) + } + + /** + * Waits until at least a single updateLatestOrThrow completes successfully. + * NOTE: The method returns immediately if an updateLatestOrThrow call has already completed + * successfully. + * + * @param waitTimeMs the timeout for the wait operation + * + * @throws - RuntimeException if the thread was interrupted during wait + * - TimeoutException if the wait can not be completed in waitTimeMs + * milli seconds + */ + def awaitUpdateOrThrow(waitTimeMs: Long): Unit = { + maybeNotifyOnce.foreach(notifier => { + var success = false + try { + success = notifier.await(waitTimeMs, TimeUnit.MILLISECONDS) + } catch { + case e: InterruptedException => + throw new RuntimeException( + "Unable to wait for FinalizedFeatureCache update to finish.", e) + } + + if (!success) { + throw new TimeoutException( + s"Timed out after waiting for ${waitTimeMs}ms for FeatureCache to be updated.") + } + }) + } + } + + /** + * A shutdownable thread to process feature node change notifications that are populated into the + * queue. If any change notification can not be processed successfully (unless it is due to an + * interrupt), the thread treats it as a fatal event and triggers Broker exit. + * + * @param name name of the thread + */ + private class ChangeNotificationProcessorThread(name: String) extends ShutdownableThread(name = name) { + override def doWork(): Unit = { + try { + queue.take.updateLatestOrThrow() + } catch { + case e: InterruptedException => info(s"Interrupted", e) Review comment: Done. But it's actually "Change notification queue interrupted". ########## File path: core/src/main/scala/kafka/server/SupportedFeatures.scala ########## @@ -0,0 +1,74 @@ +package kafka.server + +import kafka.utils.Logging +import org.apache.kafka.common.feature.{Features, FinalizedVersionRange, SupportedVersionRange} +import org.apache.kafka.common.feature.Features._ + +import scala.jdk.CollectionConverters._ + +/** + * A common object used in the Broker to define the latest features supported by the Broker. + * Also provides API to check for incompatibilities between the latest features supported by the + * Broker and cluster-wide finalized features. + */ +object SupportedFeatures extends Logging { + + /** + * This is the latest features supported by the Broker. + * This is currently empty, but in the future as we define supported features, this map should be + * populated. + */ + @volatile private var supportedFeatures = emptySupportedFeatures + + /** + * Returns the latest features supported by the Broker. + */ + def get: Features[SupportedVersionRange] = { + supportedFeatures + } + + // Should be used only for testing. + def update(newFeatures: Features[SupportedVersionRange]): Unit = { + supportedFeatures = newFeatures + } + + // Should be used only for testing. + def clear(): Unit = { + supportedFeatures = emptySupportedFeatures + } + + /** + * Returns the set of feature names found to be 'incompatible'. + * A feature incompatibility is a version mismatch between the latest feature supported by the + * Broker, and the provided cluster-wide finalized feature. This can happen because a provided + * cluster-wide finalized feature: + * 1) Does not exist in the Broker (i.e. it is unknown to the Broker). + * [OR] + * 2) Exists but the FinalizedVersionRange does not match with the supported feature's SupportedVersionRange. + * + * @param finalized The finalized features against which incompatibilities need to be checked for. + * + * @return The set of incompatible feature names. If the returned set is empty, it + * means there were no feature incompatibilities found. + */ + def incompatibleFeatures(finalized: Features[FinalizedVersionRange]): Set[String] = { Review comment: Done. Good point! ########## File path: core/src/main/scala/kafka/server/FinalizedFeatureChangeListener.scala ########## @@ -0,0 +1,203 @@ +package kafka.server + +import java.util.concurrent.{CountDownLatch, LinkedBlockingQueue, TimeUnit} + +import kafka.utils.{Logging, ShutdownableThread} +import kafka.zk.{FeatureZNode,FeatureZNodeStatus, KafkaZkClient, ZkVersion} +import kafka.zookeeper.ZNodeChangeHandler +import org.apache.kafka.common.internals.FatalExitError + +import scala.concurrent.TimeoutException + +/** + * Listens to changes in the ZK feature node, via the ZK client. Whenever a change notification + * is received from ZK, the feature cache in FinalizedFeatureCache is asynchronously updated + * to the latest features read from ZK. The cache updates are serialized through a single + * notification processor thread. + * + * @param zkClient the Zookeeper client + */ +class FinalizedFeatureChangeListener(zkClient: KafkaZkClient) extends Logging { + + /** + * Helper class used to update the FinalizedFeatureCache. + * + * @param featureZkNodePath the path to the ZK feature node to be read + * @param maybeNotifyOnce an optional latch that can be used to propvide notification + * when an update operation is over + */ + private class FeatureCacheUpdater(featureZkNodePath: String, maybeNotifyOnce: Option[CountDownLatch]) { + + def this(featureZkNodePath: String) = this(featureZkNodePath, Option.empty) + + /** + * Updates the feature cache in FinalizedFeatureCache with the latest features read from the + * ZK node in featureZkNodePath. If the cache update is not successful, then, a suitable + * exception is raised. + * + * NOTE: if a notifier was provided in the constructor, then, this method can be invoked + * only exactly once successfully. + */ + def updateLatestOrThrow(): Unit = { + maybeNotifyOnce.foreach(notifier => { + if (notifier.getCount != 1) { + throw new IllegalStateException( + "Can not notify after updateLatestOrThrow was called more than once successfully.") + } + }) + + info(s"Reading feature ZK node at path: $featureZkNodePath") + val (mayBeFeatureZNodeBytes, version) = zkClient.getDataAndVersion(featureZkNodePath) + if (version == ZkVersion.UnknownVersion) { + info(s"Feature ZK node at path: $featureZkNodePath does not exist") + FinalizedFeatureCache.clear() + } else { + val featureZNode = FeatureZNode.decode(mayBeFeatureZNodeBytes.get) + if (featureZNode.status == FeatureZNodeStatus.Disabled) { + info(s"Feature ZK node at path: $featureZkNodePath is in disabled status") + FinalizedFeatureCache.clear() + } else if(featureZNode.status == FeatureZNodeStatus.Enabled) { + FinalizedFeatureCache.updateOrThrow(featureZNode.features, version) + } else { + throw new IllegalStateException(s"Unexpected FeatureZNodeStatus found in $featureZNode") + } + } + + maybeNotifyOnce.foreach(notifier => notifier.countDown()) + } + + /** + * Waits until at least a single updateLatestOrThrow completes successfully. + * NOTE: The method returns immediately if an updateLatestOrThrow call has already completed + * successfully. + * + * @param waitTimeMs the timeout for the wait operation + * + * @throws - RuntimeException if the thread was interrupted during wait + * - TimeoutException if the wait can not be completed in waitTimeMs + * milli seconds + */ + def awaitUpdateOrThrow(waitTimeMs: Long): Unit = { + maybeNotifyOnce.foreach(notifier => { + var success = false + try { + success = notifier.await(waitTimeMs, TimeUnit.MILLISECONDS) + } catch { + case e: InterruptedException => + throw new RuntimeException( + "Unable to wait for FinalizedFeatureCache update to finish.", e) + } + + if (!success) { + throw new TimeoutException( + s"Timed out after waiting for ${waitTimeMs}ms for FeatureCache to be updated.") + } + }) + } + } + + /** + * A shutdownable thread to process feature node change notifications that are populated into the + * queue. If any change notification can not be processed successfully (unless it is due to an + * interrupt), the thread treats it as a fatal event and triggers Broker exit. + * + * @param name name of the thread + */ + private class ChangeNotificationProcessorThread(name: String) extends ShutdownableThread(name = name) { + override def doWork(): Unit = { + try { + queue.take.updateLatestOrThrow() + } catch { + case e: InterruptedException => info(s"Interrupted", e) + case e: Exception => { + error("Failed to process feature ZK node change event. The broker will exit.", e) + throw new FatalExitError(1) + } + } + } + } + + // Feature ZK node change handler. + object FeatureZNodeChangeHandler extends ZNodeChangeHandler { + override val path: String = FeatureZNode.path + + private def processNotification(): Unit = { + queue.add(new FeatureCacheUpdater(path)) + } + + override def handleCreation(): Unit = { + info(s"Feature ZK node created at path: $path") + processNotification() + } + + override def handleDataChange(): Unit = { + info(s"Feature ZK node updated at path: $path") + processNotification() + } + + override def handleDeletion(): Unit = { + warn(s"Feature ZK node deleted at path: $path") + // This event may happen, rarely (ex: operational error). + // In such a case, we prefer to just log a warning and treat the case as if the node is absent, + // and populate the FinalizedFeatureCache with empty finalized features. + processNotification() + } + } + + private val queue = new LinkedBlockingQueue[FeatureCacheUpdater] + + private val thread = new ChangeNotificationProcessorThread("feature-zk-node-event-process-thread") + + /** + * This method initializes the feature ZK node change listener. Optionally, it also ensures to + * update the FinalizedFeatureCache once with the latest contents of the feature ZK node + * (if the node exists). This step helps ensure that feature incompatibilities (if any) in brokers + * are conveniently detected before the initOrThrow() method returns to the caller. If feature + * incompatibilities are detected, this method will throw an Exception to the caller, and the Broker + * would exit eventually. + * + * @param waitOnceForCacheUpdateMs # of milli seconds to wait for feature cache to be updated once. + * If this parameter <= 0, no wait operation happens. + * + * @throws Exception if feature incompatibility check could not be finished in a timely manner + */ + def initOrThrow(waitOnceForCacheUpdateMs: Long): Unit = { + thread.start() + zkClient.registerZNodeChangeHandlerAndCheckExistence(FeatureZNodeChangeHandler) + + if (waitOnceForCacheUpdateMs > 0) { + val barrier = new CountDownLatch(1) Review comment: Done. Removed. ########## File path: core/src/main/scala/kafka/server/FinalizedFeatureChangeListener.scala ########## @@ -0,0 +1,203 @@ +package kafka.server + +import java.util.concurrent.{CountDownLatch, LinkedBlockingQueue, TimeUnit} + +import kafka.utils.{Logging, ShutdownableThread} +import kafka.zk.{FeatureZNode,FeatureZNodeStatus, KafkaZkClient, ZkVersion} +import kafka.zookeeper.ZNodeChangeHandler +import org.apache.kafka.common.internals.FatalExitError + +import scala.concurrent.TimeoutException + +/** + * Listens to changes in the ZK feature node, via the ZK client. Whenever a change notification + * is received from ZK, the feature cache in FinalizedFeatureCache is asynchronously updated + * to the latest features read from ZK. The cache updates are serialized through a single + * notification processor thread. + * + * @param zkClient the Zookeeper client + */ +class FinalizedFeatureChangeListener(zkClient: KafkaZkClient) extends Logging { + + /** + * Helper class used to update the FinalizedFeatureCache. + * + * @param featureZkNodePath the path to the ZK feature node to be read + * @param maybeNotifyOnce an optional latch that can be used to propvide notification + * when an update operation is over + */ + private class FeatureCacheUpdater(featureZkNodePath: String, maybeNotifyOnce: Option[CountDownLatch]) { + + def this(featureZkNodePath: String) = this(featureZkNodePath, Option.empty) + + /** + * Updates the feature cache in FinalizedFeatureCache with the latest features read from the + * ZK node in featureZkNodePath. If the cache update is not successful, then, a suitable + * exception is raised. + * + * NOTE: if a notifier was provided in the constructor, then, this method can be invoked + * only exactly once successfully. + */ + def updateLatestOrThrow(): Unit = { + maybeNotifyOnce.foreach(notifier => { + if (notifier.getCount != 1) { + throw new IllegalStateException( + "Can not notify after updateLatestOrThrow was called more than once successfully.") + } + }) + + info(s"Reading feature ZK node at path: $featureZkNodePath") + val (mayBeFeatureZNodeBytes, version) = zkClient.getDataAndVersion(featureZkNodePath) + if (version == ZkVersion.UnknownVersion) { + info(s"Feature ZK node at path: $featureZkNodePath does not exist") + FinalizedFeatureCache.clear() + } else { + val featureZNode = FeatureZNode.decode(mayBeFeatureZNodeBytes.get) + if (featureZNode.status == FeatureZNodeStatus.Disabled) { + info(s"Feature ZK node at path: $featureZkNodePath is in disabled status") + FinalizedFeatureCache.clear() + } else if(featureZNode.status == FeatureZNodeStatus.Enabled) { + FinalizedFeatureCache.updateOrThrow(featureZNode.features, version) + } else { + throw new IllegalStateException(s"Unexpected FeatureZNodeStatus found in $featureZNode") + } + } + + maybeNotifyOnce.foreach(notifier => notifier.countDown()) + } + + /** + * Waits until at least a single updateLatestOrThrow completes successfully. + * NOTE: The method returns immediately if an updateLatestOrThrow call has already completed + * successfully. + * + * @param waitTimeMs the timeout for the wait operation + * + * @throws - RuntimeException if the thread was interrupted during wait + * - TimeoutException if the wait can not be completed in waitTimeMs + * milli seconds + */ + def awaitUpdateOrThrow(waitTimeMs: Long): Unit = { + maybeNotifyOnce.foreach(notifier => { + var success = false + try { + success = notifier.await(waitTimeMs, TimeUnit.MILLISECONDS) + } catch { + case e: InterruptedException => + throw new RuntimeException( + "Unable to wait for FinalizedFeatureCache update to finish.", e) + } + + if (!success) { + throw new TimeoutException( + s"Timed out after waiting for ${waitTimeMs}ms for FeatureCache to be updated.") + } + }) + } + } + + /** + * A shutdownable thread to process feature node change notifications that are populated into the + * queue. If any change notification can not be processed successfully (unless it is due to an + * interrupt), the thread treats it as a fatal event and triggers Broker exit. + * + * @param name name of the thread + */ + private class ChangeNotificationProcessorThread(name: String) extends ShutdownableThread(name = name) { + override def doWork(): Unit = { + try { + queue.take.updateLatestOrThrow() Review comment: The function blocks indefinitely - yes. But this shouldn't cause a problem or lead to deadlock/limbo situation. Even if this thread is waiting for an item to become available in the queue, the waiting thread can always get interrupted by the `FinalizedFeatureChangeListener.close()` call which calls `ShutdownableThread.shutdown()`. Note that the `ShutdownableThread.shutdown()` method interrupts the thread, which should unblock any waiting `queue.take()` operation and makes it raise an `InterruptedException`: https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/utils/ShutdownableThread.scala#L32-L59 https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/LinkedBlockingQueue.html#take() ########## File path: core/src/main/scala/kafka/server/FinalizedFeatureChangeListener.scala ########## @@ -0,0 +1,203 @@ +package kafka.server + +import java.util.concurrent.{CountDownLatch, LinkedBlockingQueue, TimeUnit} + +import kafka.utils.{Logging, ShutdownableThread} +import kafka.zk.{FeatureZNode,FeatureZNodeStatus, KafkaZkClient, ZkVersion} +import kafka.zookeeper.ZNodeChangeHandler +import org.apache.kafka.common.internals.FatalExitError + +import scala.concurrent.TimeoutException + +/** + * Listens to changes in the ZK feature node, via the ZK client. Whenever a change notification + * is received from ZK, the feature cache in FinalizedFeatureCache is asynchronously updated + * to the latest features read from ZK. The cache updates are serialized through a single + * notification processor thread. + * + * @param zkClient the Zookeeper client + */ +class FinalizedFeatureChangeListener(zkClient: KafkaZkClient) extends Logging { + + /** + * Helper class used to update the FinalizedFeatureCache. + * + * @param featureZkNodePath the path to the ZK feature node to be read + * @param maybeNotifyOnce an optional latch that can be used to propvide notification + * when an update operation is over + */ + private class FeatureCacheUpdater(featureZkNodePath: String, maybeNotifyOnce: Option[CountDownLatch]) { + + def this(featureZkNodePath: String) = this(featureZkNodePath, Option.empty) + + /** + * Updates the feature cache in FinalizedFeatureCache with the latest features read from the + * ZK node in featureZkNodePath. If the cache update is not successful, then, a suitable + * exception is raised. + * + * NOTE: if a notifier was provided in the constructor, then, this method can be invoked + * only exactly once successfully. + */ + def updateLatestOrThrow(): Unit = { + maybeNotifyOnce.foreach(notifier => { + if (notifier.getCount != 1) { + throw new IllegalStateException( + "Can not notify after updateLatestOrThrow was called more than once successfully.") + } + }) + + info(s"Reading feature ZK node at path: $featureZkNodePath") + val (mayBeFeatureZNodeBytes, version) = zkClient.getDataAndVersion(featureZkNodePath) + if (version == ZkVersion.UnknownVersion) { + info(s"Feature ZK node at path: $featureZkNodePath does not exist") + FinalizedFeatureCache.clear() + } else { + val featureZNode = FeatureZNode.decode(mayBeFeatureZNodeBytes.get) + if (featureZNode.status == FeatureZNodeStatus.Disabled) { + info(s"Feature ZK node at path: $featureZkNodePath is in disabled status") + FinalizedFeatureCache.clear() + } else if(featureZNode.status == FeatureZNodeStatus.Enabled) { + FinalizedFeatureCache.updateOrThrow(featureZNode.features, version) + } else { + throw new IllegalStateException(s"Unexpected FeatureZNodeStatus found in $featureZNode") + } + } + + maybeNotifyOnce.foreach(notifier => notifier.countDown()) + } + + /** + * Waits until at least a single updateLatestOrThrow completes successfully. + * NOTE: The method returns immediately if an updateLatestOrThrow call has already completed + * successfully. + * + * @param waitTimeMs the timeout for the wait operation + * + * @throws - RuntimeException if the thread was interrupted during wait + * - TimeoutException if the wait can not be completed in waitTimeMs + * milli seconds + */ + def awaitUpdateOrThrow(waitTimeMs: Long): Unit = { + maybeNotifyOnce.foreach(notifier => { + var success = false + try { + success = notifier.await(waitTimeMs, TimeUnit.MILLISECONDS) + } catch { + case e: InterruptedException => + throw new RuntimeException( + "Unable to wait for FinalizedFeatureCache update to finish.", e) + } + + if (!success) { + throw new TimeoutException( + s"Timed out after waiting for ${waitTimeMs}ms for FeatureCache to be updated.") + } + }) + } + } + + /** + * A shutdownable thread to process feature node change notifications that are populated into the + * queue. If any change notification can not be processed successfully (unless it is due to an + * interrupt), the thread treats it as a fatal event and triggers Broker exit. + * + * @param name name of the thread + */ + private class ChangeNotificationProcessorThread(name: String) extends ShutdownableThread(name = name) { + override def doWork(): Unit = { + try { + queue.take.updateLatestOrThrow() + } catch { + case e: InterruptedException => info(s"Interrupted", e) + case e: Exception => { + error("Failed to process feature ZK node change event. The broker will exit.", e) + throw new FatalExitError(1) + } + } + } + } + + // Feature ZK node change handler. + object FeatureZNodeChangeHandler extends ZNodeChangeHandler { + override val path: String = FeatureZNode.path + + private def processNotification(): Unit = { + queue.add(new FeatureCacheUpdater(path)) + } + + override def handleCreation(): Unit = { + info(s"Feature ZK node created at path: $path") + processNotification() + } + + override def handleDataChange(): Unit = { + info(s"Feature ZK node updated at path: $path") + processNotification() + } + + override def handleDeletion(): Unit = { + warn(s"Feature ZK node deleted at path: $path") + // This event may happen, rarely (ex: operational error). + // In such a case, we prefer to just log a warning and treat the case as if the node is absent, + // and populate the FinalizedFeatureCache with empty finalized features. + processNotification() + } + } + + private val queue = new LinkedBlockingQueue[FeatureCacheUpdater] + + private val thread = new ChangeNotificationProcessorThread("feature-zk-node-event-process-thread") + + /** + * This method initializes the feature ZK node change listener. Optionally, it also ensures to + * update the FinalizedFeatureCache once with the latest contents of the feature ZK node + * (if the node exists). This step helps ensure that feature incompatibilities (if any) in brokers + * are conveniently detected before the initOrThrow() method returns to the caller. If feature + * incompatibilities are detected, this method will throw an Exception to the caller, and the Broker + * would exit eventually. + * + * @param waitOnceForCacheUpdateMs # of milli seconds to wait for feature cache to be updated once. + * If this parameter <= 0, no wait operation happens. + * + * @throws Exception if feature incompatibility check could not be finished in a timely manner + */ + def initOrThrow(waitOnceForCacheUpdateMs: Long): Unit = { + thread.start() + zkClient.registerZNodeChangeHandlerAndCheckExistence(FeatureZNodeChangeHandler) + + if (waitOnceForCacheUpdateMs > 0) { + val barrier = new CountDownLatch(1) + val ensureCacheUpdateOnce = new FeatureCacheUpdater(FeatureZNodeChangeHandler.path, Some(barrier)) + queue.add(ensureCacheUpdateOnce) + try { + ensureCacheUpdateOnce.awaitUpdateOrThrow(waitOnceForCacheUpdateMs) + } catch { + case e: Exception => { + close() + throw e + } + } + } + } + + /** + * Closes the feature ZK node change listener by unregistering the listener from ZK client, + * clearing the queue and shutting down the ChangeNotificationProcessorThread. + */ + def close(): Unit = { + zkClient.unregisterZNodeChangeHandler(FeatureZNodeChangeHandler.path) + queue.clear() + thread.shutdown() + thread.join() + } + + // Useful for testing. Review comment: Done. ########## File path: clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java ########## @@ -17,13 +17,19 @@ package org.apache.kafka.common.requests; +import org.apache.kafka.common.feature.Features; Review comment: Done. ########## File path: core/src/main/scala/kafka/server/KafkaServer.scala ########## @@ -210,6 +215,14 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP /* setup zookeeper */ initZkClient(time) + /* initialize features */ + _featureChangeListener = new FinalizedFeatureChangeListener(_zkClient) + if (config.interBrokerProtocolVersion >= KAFKA_2_6_IV1) { + // The feature versioning system (KIP-584) is active only when: Review comment: Done. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org