rondagostino commented on a change in pull request #10003:
URL: https://github.com/apache/kafka/pull/10003#discussion_r568716273



##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -316,7 +324,7 @@ class ReplicaManager(val config: KafkaConfig,
   private def maybeRemoveTopicMetrics(topic: String): Unit = {
     val topicHasOnlinePartition = allPartitions.values.exists {
       case HostedPartition.Online(partition) => topic == partition.topic
-      case HostedPartition.None | HostedPartition.Offline => false
+      case _ => false

Review comment:
       > a sealed trait with def partition: Partition
   
   I like this solution better than making everything a case class with an 
`Option[Partition]`.  Will push a commit with it shortly.
   

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -257,6 +264,37 @@ class ReplicaManager(val config: KafkaConfig,
     }
   }
 
+  // Changes are initially deferred when using a Raft-based metadata quorum, 
and they may flip-flop to not
+  // being deferred and being deferred again thereafter as the broker 
(re)acquires/loses its lease.
+  // Changes are never deferred when using ZooKeeper.  When true, this 
indicates that we should transition
+  // online partitions to the deferred state if we see a metadata update for 
that partition.
+  @volatile private var deferringMetadataChanges: Boolean = 
!config.requiresZookeeper

Review comment:
       Good point.  The same thread will be both managing this value and 
submitting metadata changes, so it can remember if it has already started 
deferring changes if it needs to do so; plus everything here related to 
deferral happens within the lock anyway.  I removed `@volatile`.

##########
File path: core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
##########
@@ -111,7 +111,7 @@ class ReplicaAlterLogDirsThread(name: String,
   override def processPartitionData(topicPartition: TopicPartition,
                                     fetchOffset: Long,
                                     partitionData: PartitionData[Records]): 
Option[LogAppendInfo] = {
-    val partition = replicaMgr.nonOfflinePartition(topicPartition).get
+    val partition = replicaMgr.onlinePartition(topicPartition).get

Review comment:
       Yes, this appears to be an existing issue.  I fixed it in 4 places.

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -942,7 +942,7 @@ class ReplicaManager(val config: KafkaConfig,
                                requiredAcks: Short): Map[TopicPartition, 
LogAppendResult] = {
     val traceEnabled = isTraceEnabled
     def processFailedRecord(topicPartition: TopicPartition, t: Throwable) = {
-      val logStartOffset = 
onlinePartition(topicPartition).map(_.logStartOffset).getOrElse(-1)
+      val logStartOffset: Long = 
onlinePartition(topicPartition).map(_.logStartOffset).getOrElse(-1)

Review comment:
       Doh! I was confused about why it had failed.  Will fix.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to