hachikuji commented on a change in pull request #8724: URL: https://github.com/apache/kafka/pull/8724#discussion_r434702023
########## File path: core/src/main/scala/kafka/controller/ControllerContext.scala ########## @@ -391,6 +404,90 @@ class ControllerContext { partitionsForTopic(topic).filter { partition => states.contains(partitionState(partition)) }.toSet } + def putPartitionLeadershipInfo(partition: TopicPartition, + leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch): Unit = { + val previous = partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch) + val replicaAssignment = partitionFullReplicaAssignment(partition) + updatePreferredReplicaImbalanceMetric(partition, Some(replicaAssignment), previous, + Some(replicaAssignment), Some(leaderIsrAndControllerEpoch)) + } + + def partitionLeadershipInfo(partition: TopicPartition): Option[LeaderIsrAndControllerEpoch] = { + partitionLeadershipInfo.get(partition) + } + + def partitionsLeadershipInfo(): Iterable[(TopicPartition, LeaderIsrAndControllerEpoch)] = { + partitionLeadershipInfo + } + + def partitionsWithLeaders(): Set[TopicPartition] = { + partitionLeadershipInfo.keySet + } + + def partitionsWithoutLeaders(): Set[TopicPartition] = { + partitionLeadershipInfo.filter { case (topicPartition, leaderIsrAndControllerEpoch) => + !isReplicaOnline(leaderIsrAndControllerEpoch.leaderAndIsr.leader, topicPartition) && + !isTopicQueuedUpForDeletion(topicPartition.topic) + }.keySet + } + + def partitionLeadsOnBroker(brokerId: Int): Set[TopicPartition] = { Review comment: nit: `partitionLeadersOnBroker`? ########## File path: core/src/main/scala/kafka/controller/ControllerContext.scala ########## @@ -391,6 +404,90 @@ class ControllerContext { partitionsForTopic(topic).filter { partition => states.contains(partitionState(partition)) }.toSet } + def putPartitionLeadershipInfo(partition: TopicPartition, + leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch): Unit = { + val previous = partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch) + val replicaAssignment = partitionFullReplicaAssignment(partition) + updatePreferredReplicaImbalanceMetric(partition, Some(replicaAssignment), previous, + Some(replicaAssignment), Some(leaderIsrAndControllerEpoch)) + } + + def partitionLeadershipInfo(partition: TopicPartition): Option[LeaderIsrAndControllerEpoch] = { + partitionLeadershipInfo.get(partition) + } + + def partitionsLeadershipInfo(): Iterable[(TopicPartition, LeaderIsrAndControllerEpoch)] = { + partitionLeadershipInfo + } + + def partitionsWithLeaders(): Set[TopicPartition] = { + partitionLeadershipInfo.keySet Review comment: This definition seems inconsistent with `partitionsWithoutLeaders`. I think you're just trying to preserve the existing logic. It might make sense to use a different name to avoid the apparent inconsistency? Maybe we could change `partitionsWithoutLeaders` to be `partitionsWithOfflineLeaders` or something like that. Looking at the caller, it looks like it would be reasonable to exclude topics which are being queued for deletion in both cases, but we could change that separately if you think it is risky. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org