hachikuji commented on a change in pull request #10003:
URL: https://github.com/apache/kafka/pull/10003#discussion_r568216372



##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -257,6 +264,37 @@ class ReplicaManager(val config: KafkaConfig,
     }
   }
 
+  // Changes are initially deferred when using a Raft-based metadata quorum, 
and they may flip-flop to not
+  // being deferred and being deferred again thereafter as the broker 
(re)acquires/loses its lease.
+  // Changes are never deferred when using ZooKeeper.  When true, this 
indicates that we should transition
+  // online partitions to the deferred state if we see a metadata update for 
that partition.
+  @volatile private var deferringMetadataChanges: Boolean = 
!config.requiresZookeeper
+  stateChangeLogger.info(s"Metadata changes 
deferred=$deferringMetadataChanges")
+
+  private def confirmNotDeferringMetadataUpdatesWithZooKeeper(): Unit = {
+    if (deferringMetadataChanges) {
+      throw new IllegalStateException("Partition metadata changes should never 
be deferred when using ZooKeeper")
+    }
+  }
+
+  def beginMetadataChangeDeferral(): Unit = {
+    replicaStateChangeLock synchronized {
+      if (config.requiresZookeeper) {
+        throw new IllegalStateException("Partition metadata changes can never 
be deferred when using ZooKeeper")
+      }
+      deferringMetadataChanges = true
+      stateChangeLogger.info(s"Metadata changes are now being deferred")
+    }
+  }
+
+  def endMetadataChangeDeferral(): Unit = {
+    replicaStateChangeLock synchronized {
+      // TODO: implement for Raft-based metadata log messages

Review comment:
       nit: we can leave the TODO out. 
   
   Also, it might be worth checking `config.requiresZookeeper` as we do in 
`beginMetadataChangeDeferral`. If nothing else, it serves as explicit 
documentation.

##########
File path: core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
##########
@@ -111,7 +111,7 @@ class ReplicaAlterLogDirsThread(name: String,
   override def processPartitionData(topicPartition: TopicPartition,
                                     fetchOffset: Long,
                                     partitionData: PartitionData[Records]): 
Option[LogAppendInfo] = {
-    val partition = replicaMgr.nonOfflinePartition(topicPartition).get
+    val partition = replicaMgr.onlinePartition(topicPartition).get

Review comment:
       It seems like we should be using `getPartitionOrException` here. 
Similarly in `ReplicaFetcherThread`.

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1257,7 +1307,8 @@ class ReplicaManager(val config: KafkaConfig,
 
   def maybeUpdateMetadataCache(correlationId: Int, updateMetadataRequest: 
UpdateMetadataRequest) : Seq[TopicPartition] =  {
     replicaStateChangeLock synchronized {
-      if(updateMetadataRequest.controllerEpoch < controllerEpoch) {
+      confirmNotDeferringMetadataUpdatesWithZooKeeper()

Review comment:
       I think the check here and in `becomeLeaderOrFollower` is overkill. We 
already ensure that `beginMetadataChangeDeferral` cannot be called when using 
zk.

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -316,7 +324,7 @@ class ReplicaManager(val config: KafkaConfig,
   private def maybeRemoveTopicMetrics(topic: String): Unit = {
     val topicHasOnlinePartition = allPartitions.values.exists {
       case HostedPartition.Online(partition) => topic == partition.topic
-      case HostedPartition.None | HostedPartition.Offline => false
+      case _ => false

Review comment:
       This is one case we probably _do_ want to act in the deferred state. One 
option I was consider is whether we should give 
   ```scala
   sealed trait HostedPartition {
     def partition: Option[Partition]
   }
   ```
   Then we could refactor this as a `filter` or something. 

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -257,6 +264,37 @@ class ReplicaManager(val config: KafkaConfig,
     }
   }
 
+  // Changes are initially deferred when using a Raft-based metadata quorum, 
and they may flip-flop to not
+  // being deferred and being deferred again thereafter as the broker 
(re)acquires/loses its lease.
+  // Changes are never deferred when using ZooKeeper.  When true, this 
indicates that we should transition
+  // online partitions to the deferred state if we see a metadata update for 
that partition.
+  @volatile private var deferringMetadataChanges: Boolean = 
!config.requiresZookeeper
+  stateChangeLogger.info(s"Metadata changes 
deferred=$deferringMetadataChanges")
+
+  private def confirmNotDeferringMetadataUpdatesWithZooKeeper(): Unit = {
+    if (deferringMetadataChanges) {
+      throw new IllegalStateException("Partition metadata changes should never 
be deferred when using ZooKeeper")
+    }
+  }
+
+  def beginMetadataChangeDeferral(): Unit = {
+    replicaStateChangeLock synchronized {
+      if (config.requiresZookeeper) {

Review comment:
       nit: doesn't matter too much, but we may as well pull this outside the 
lock

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -257,6 +264,37 @@ class ReplicaManager(val config: KafkaConfig,
     }
   }
 
+  // Changes are initially deferred when using a Raft-based metadata quorum, 
and they may flip-flop to not
+  // being deferred and being deferred again thereafter as the broker 
(re)acquires/loses its lease.
+  // Changes are never deferred when using ZooKeeper.  When true, this 
indicates that we should transition
+  // online partitions to the deferred state if we see a metadata update for 
that partition.
+  @volatile private var deferringMetadataChanges: Boolean = 
!config.requiresZookeeper
+  stateChangeLogger.info(s"Metadata changes 
deferred=$deferringMetadataChanges")

Review comment:
       I think we can remove this or lower to debug. It doesn't seem too useful.

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -713,6 +760,9 @@ class ReplicaManager(val config: KafkaConfig,
             case HostedPartition.Offline =>
               throw new KafkaStorageException(s"Partition $topicPartition is 
offline")
 
+            case HostedPartition.Deferred(_) =>
+              throw new KafkaStorageException(s"Partition $topicPartition is 
deferred")

Review comment:
       Hmm, this doesn't seem right. Not sure why a deferred change would cause 
a storage error. How about we throw `IllegalStateException` instead to make it 
clear that this is an unexpected state for now.

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -257,6 +264,37 @@ class ReplicaManager(val config: KafkaConfig,
     }
   }
 
+  // Changes are initially deferred when using a Raft-based metadata quorum, 
and they may flip-flop to not
+  // being deferred and being deferred again thereafter as the broker 
(re)acquires/loses its lease.
+  // Changes are never deferred when using ZooKeeper.  When true, this 
indicates that we should transition
+  // online partitions to the deferred state if we see a metadata update for 
that partition.
+  @volatile private var deferringMetadataChanges: Boolean = 
!config.requiresZookeeper

Review comment:
       Why does this need to volatile?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to