hachikuji commented on a change in pull request #10003: URL: https://github.com/apache/kafka/pull/10003#discussion_r568216372
########## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ########## @@ -257,6 +264,37 @@ class ReplicaManager(val config: KafkaConfig, } } + // Changes are initially deferred when using a Raft-based metadata quorum, and they may flip-flop to not + // being deferred and being deferred again thereafter as the broker (re)acquires/loses its lease. + // Changes are never deferred when using ZooKeeper. When true, this indicates that we should transition + // online partitions to the deferred state if we see a metadata update for that partition. + @volatile private var deferringMetadataChanges: Boolean = !config.requiresZookeeper + stateChangeLogger.info(s"Metadata changes deferred=$deferringMetadataChanges") + + private def confirmNotDeferringMetadataUpdatesWithZooKeeper(): Unit = { + if (deferringMetadataChanges) { + throw new IllegalStateException("Partition metadata changes should never be deferred when using ZooKeeper") + } + } + + def beginMetadataChangeDeferral(): Unit = { + replicaStateChangeLock synchronized { + if (config.requiresZookeeper) { + throw new IllegalStateException("Partition metadata changes can never be deferred when using ZooKeeper") + } + deferringMetadataChanges = true + stateChangeLogger.info(s"Metadata changes are now being deferred") + } + } + + def endMetadataChangeDeferral(): Unit = { + replicaStateChangeLock synchronized { + // TODO: implement for Raft-based metadata log messages Review comment: nit: we can leave the TODO out. Also, it might be worth checking `config.requiresZookeeper` as we do in `beginMetadataChangeDeferral`. If nothing else, it serves as explicit documentation. ########## File path: core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala ########## @@ -111,7 +111,7 @@ class ReplicaAlterLogDirsThread(name: String, override def processPartitionData(topicPartition: TopicPartition, fetchOffset: Long, partitionData: PartitionData[Records]): Option[LogAppendInfo] = { - val partition = replicaMgr.nonOfflinePartition(topicPartition).get + val partition = replicaMgr.onlinePartition(topicPartition).get Review comment: It seems like we should be using `getPartitionOrException` here. Similarly in `ReplicaFetcherThread`. ########## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ########## @@ -1257,7 +1307,8 @@ class ReplicaManager(val config: KafkaConfig, def maybeUpdateMetadataCache(correlationId: Int, updateMetadataRequest: UpdateMetadataRequest) : Seq[TopicPartition] = { replicaStateChangeLock synchronized { - if(updateMetadataRequest.controllerEpoch < controllerEpoch) { + confirmNotDeferringMetadataUpdatesWithZooKeeper() Review comment: I think the check here and in `becomeLeaderOrFollower` is overkill. We already ensure that `beginMetadataChangeDeferral` cannot be called when using zk. ########## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ########## @@ -316,7 +324,7 @@ class ReplicaManager(val config: KafkaConfig, private def maybeRemoveTopicMetrics(topic: String): Unit = { val topicHasOnlinePartition = allPartitions.values.exists { case HostedPartition.Online(partition) => topic == partition.topic - case HostedPartition.None | HostedPartition.Offline => false + case _ => false Review comment: This is one case we probably _do_ want to act in the deferred state. One option I was consider is whether we should give ```scala sealed trait HostedPartition { def partition: Option[Partition] } ``` Then we could refactor this as a `filter` or something. ########## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ########## @@ -257,6 +264,37 @@ class ReplicaManager(val config: KafkaConfig, } } + // Changes are initially deferred when using a Raft-based metadata quorum, and they may flip-flop to not + // being deferred and being deferred again thereafter as the broker (re)acquires/loses its lease. + // Changes are never deferred when using ZooKeeper. When true, this indicates that we should transition + // online partitions to the deferred state if we see a metadata update for that partition. + @volatile private var deferringMetadataChanges: Boolean = !config.requiresZookeeper + stateChangeLogger.info(s"Metadata changes deferred=$deferringMetadataChanges") + + private def confirmNotDeferringMetadataUpdatesWithZooKeeper(): Unit = { + if (deferringMetadataChanges) { + throw new IllegalStateException("Partition metadata changes should never be deferred when using ZooKeeper") + } + } + + def beginMetadataChangeDeferral(): Unit = { + replicaStateChangeLock synchronized { + if (config.requiresZookeeper) { Review comment: nit: doesn't matter too much, but we may as well pull this outside the lock ########## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ########## @@ -257,6 +264,37 @@ class ReplicaManager(val config: KafkaConfig, } } + // Changes are initially deferred when using a Raft-based metadata quorum, and they may flip-flop to not + // being deferred and being deferred again thereafter as the broker (re)acquires/loses its lease. + // Changes are never deferred when using ZooKeeper. When true, this indicates that we should transition + // online partitions to the deferred state if we see a metadata update for that partition. + @volatile private var deferringMetadataChanges: Boolean = !config.requiresZookeeper + stateChangeLogger.info(s"Metadata changes deferred=$deferringMetadataChanges") Review comment: I think we can remove this or lower to debug. It doesn't seem too useful. ########## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ########## @@ -713,6 +760,9 @@ class ReplicaManager(val config: KafkaConfig, case HostedPartition.Offline => throw new KafkaStorageException(s"Partition $topicPartition is offline") + case HostedPartition.Deferred(_) => + throw new KafkaStorageException(s"Partition $topicPartition is deferred") Review comment: Hmm, this doesn't seem right. Not sure why a deferred change would cause a storage error. How about we throw `IllegalStateException` instead to make it clear that this is an unexpected state for now. ########## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ########## @@ -257,6 +264,37 @@ class ReplicaManager(val config: KafkaConfig, } } + // Changes are initially deferred when using a Raft-based metadata quorum, and they may flip-flop to not + // being deferred and being deferred again thereafter as the broker (re)acquires/loses its lease. + // Changes are never deferred when using ZooKeeper. When true, this indicates that we should transition + // online partitions to the deferred state if we see a metadata update for that partition. + @volatile private var deferringMetadataChanges: Boolean = !config.requiresZookeeper Review comment: Why does this need to volatile? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org