hachikuji commented on a change in pull request #8724:
URL: https://github.com/apache/kafka/pull/8724#discussion_r434702023



##########
File path: core/src/main/scala/kafka/controller/ControllerContext.scala
##########
@@ -391,6 +404,90 @@ class ControllerContext {
     partitionsForTopic(topic).filter { partition => 
states.contains(partitionState(partition)) }.toSet
   }
 
+  def putPartitionLeadershipInfo(partition: TopicPartition,
+                                 leaderIsrAndControllerEpoch: 
LeaderIsrAndControllerEpoch): Unit = {
+    val previous = partitionLeadershipInfo.put(partition, 
leaderIsrAndControllerEpoch)
+    val replicaAssignment = partitionFullReplicaAssignment(partition)
+    updatePreferredReplicaImbalanceMetric(partition, Some(replicaAssignment), 
previous,
+      Some(replicaAssignment), Some(leaderIsrAndControllerEpoch))
+  }
+
+  def partitionLeadershipInfo(partition: TopicPartition): 
Option[LeaderIsrAndControllerEpoch] = {
+    partitionLeadershipInfo.get(partition)
+  }
+
+  def partitionsLeadershipInfo(): Iterable[(TopicPartition, 
LeaderIsrAndControllerEpoch)] = {
+    partitionLeadershipInfo
+  }
+
+  def partitionsWithLeaders(): Set[TopicPartition] = {
+    partitionLeadershipInfo.keySet
+  }
+
+  def partitionsWithoutLeaders(): Set[TopicPartition] = {
+    partitionLeadershipInfo.filter { case (topicPartition, 
leaderIsrAndControllerEpoch) =>
+      !isReplicaOnline(leaderIsrAndControllerEpoch.leaderAndIsr.leader, 
topicPartition) &&
+        !isTopicQueuedUpForDeletion(topicPartition.topic)
+    }.keySet
+  }
+
+  def partitionLeadsOnBroker(brokerId: Int): Set[TopicPartition] = {

Review comment:
       nit: `partitionLeadersOnBroker`?

##########
File path: core/src/main/scala/kafka/controller/ControllerContext.scala
##########
@@ -391,6 +404,90 @@ class ControllerContext {
     partitionsForTopic(topic).filter { partition => 
states.contains(partitionState(partition)) }.toSet
   }
 
+  def putPartitionLeadershipInfo(partition: TopicPartition,
+                                 leaderIsrAndControllerEpoch: 
LeaderIsrAndControllerEpoch): Unit = {
+    val previous = partitionLeadershipInfo.put(partition, 
leaderIsrAndControllerEpoch)
+    val replicaAssignment = partitionFullReplicaAssignment(partition)
+    updatePreferredReplicaImbalanceMetric(partition, Some(replicaAssignment), 
previous,
+      Some(replicaAssignment), Some(leaderIsrAndControllerEpoch))
+  }
+
+  def partitionLeadershipInfo(partition: TopicPartition): 
Option[LeaderIsrAndControllerEpoch] = {
+    partitionLeadershipInfo.get(partition)
+  }
+
+  def partitionsLeadershipInfo(): Iterable[(TopicPartition, 
LeaderIsrAndControllerEpoch)] = {
+    partitionLeadershipInfo
+  }
+
+  def partitionsWithLeaders(): Set[TopicPartition] = {
+    partitionLeadershipInfo.keySet

Review comment:
       This definition seems inconsistent with `partitionsWithoutLeaders`. I 
think you're just trying to preserve the existing logic. It might make sense to 
use a different name to avoid the apparent inconsistency? Maybe we could change 
`partitionsWithoutLeaders` to be `partitionsWithOfflineLeaders` or something 
like that. Looking at the caller, it looks like it would be reasonable to 
exclude topics which are being queued for deletion in both cases, but we could 
change that separately if you think it is risky.
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to