kowshik commented on a change in pull request #8680: URL: https://github.com/apache/kafka/pull/8680#discussion_r429591914
########## File path: core/src/main/scala/kafka/server/FinalizedFeatureChangeListener.scala ########## @@ -0,0 +1,232 @@ +package kafka.server + +import java.util.concurrent.{CountDownLatch, LinkedBlockingQueue, TimeUnit} + +import kafka.utils.{Logging, ShutdownableThread} +import kafka.zk.{FeatureZNode,FeatureZNodeStatus, KafkaZkClient, ZkVersion} +import kafka.zookeeper.ZNodeChangeHandler +import org.apache.kafka.common.internals.FatalExitError + +import scala.concurrent.TimeoutException + +/** + * Listens to changes in the ZK feature node, via the ZK client. Whenever a change notification + * is received from ZK, the feature cache in FinalizedFeatureCache is asynchronously updated + * to the latest features read from ZK. The cache updates are serialized through a single + * notification processor thread. + * + * @param zkClient the Zookeeper client + */ +class FinalizedFeatureChangeListener(zkClient: KafkaZkClient) extends Logging { + + /** + * Helper class used to update the FinalizedFeatureCache. + * + * @param featureZkNodePath the path to the ZK feature node to be read + * @param maybeNotifyOnce an optional latch that can be used to notify the caller when an + * updateOrThrow() operation is over + */ + private class FeatureCacheUpdater(featureZkNodePath: String, maybeNotifyOnce: Option[CountDownLatch]) { + + def this(featureZkNodePath: String) = this(featureZkNodePath, Option.empty) + + /** + * Updates the feature cache in FinalizedFeatureCache with the latest features read from the + * ZK node in featureZkNodePath. If the cache update is not successful, then, a suitable + * exception is raised. + * + * NOTE: if a notifier was provided in the constructor, then, this method can be invoked + * only exactly once successfully. + * + * @throws IllegalStateException, if a non-empty notifier was provided in the constructor, and + * this method is called again after a successful previous invocation. + * + * FeatureCacheUpdateException, if there was an error in updating the + * FinalizedFeatureCache. + * + * RuntimeException, if there was a failure in reading/deserializing the + * contents of the feature ZK node. + */ + def updateLatestOrThrow(): Unit = { + maybeNotifyOnce.foreach(notifier => { + if (notifier.getCount != 1) { + throw new IllegalStateException( + "Can not notify after updateLatestOrThrow was called more than once successfully.") + } + }) + + info(s"Reading feature ZK node at path: $featureZkNodePath") + var mayBeFeatureZNodeBytes: Option[Array[Byte]] = null + var version: Int = ZkVersion.UnknownVersion + try { + val result = zkClient.getDataAndVersion(featureZkNodePath) + mayBeFeatureZNodeBytes = result._1 + version = result._2 + } catch { + // Convert to RuntimeException, to avoid a confusion that there is no argument passed + // to the updateOrThrow() method. + case e: IllegalArgumentException => throw new RuntimeException(e) + } + + // There are 4 cases: + // + // (empty dataBytes, valid version) => The empty dataBytes will fail FeatureZNode deserialization. + // FeatureZNode, when present in ZK, can not have empty contents. + // (non-empty dataBytes, valid version) => This is a valid case, and should pass FeatureZNode deserialization + // if dataBytes contains valid data. + // (empty dataBytes, unknown version) => This is a valid case, and this can happen if the FeatureZNode + // does not exist in ZK. + // (non-empty dataBytes, unknown version) => This case is impossible, since, KafkaZkClient.getDataAndVersion + // API ensures that unknown version is returned only when the + // ZK node is absent. Therefore dataBytes should be empty in such + // a case. + if (version == ZkVersion.UnknownVersion) { + info(s"Feature ZK node at path: $featureZkNodePath does not exist") + FinalizedFeatureCache.clear() + } else { + val featureZNode = FeatureZNode.decode(mayBeFeatureZNodeBytes.get) + if (featureZNode.status == FeatureZNodeStatus.Disabled) { + info(s"Feature ZK node at path: $featureZkNodePath is in disabled status") + FinalizedFeatureCache.clear() + } else if(featureZNode.status == FeatureZNodeStatus.Enabled) { Review comment: Done. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org