chia7712 commented on code in PR #19611: URL: https://github.com/apache/kafka/pull/19611#discussion_r2094571756
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/SubscribedTopicDescriberImpl.java: ########## @@ -79,49 +80,65 @@ public int numPartitions(Uuid topicId) { */ @Override public Set<String> racksForPartition(Uuid topicId, int partition) { + TopicImage topic = metadataImage.topics().getTopic(topicId); + if (topic != null) { + PartitionRegistration partitionRegistration = topic.partitions().get(partition); + if (partitionRegistration != null) { + Set<String> racks = new HashSet<>(); + for (int replica : partitionRegistration.replicas) { + // Only add the rack if it is available for the broker/replica. + metadataImage.cluster().broker(replica).rack().ifPresent(racks::add); + } + return racks; + } + } return Set.of(); } /** - * Returns a set of assignable partitions from the topic metadata. - * If the allowed partition map is null, all the partitions in the corresponding - * topic metadata are returned for the argument topic id. If allowed map is empty, + * Returns a set of assignable partitions from the metadata image. + * If the allowed partition map is Optional.empty(), all the partitions in the corresponding + * topic image are returned for the argument topic id. If allowed map is empty, * empty set is returned. * * @param topicId The uuid of the topic * @return Set of integers if assignable partitions available, empty otherwise. */ @Override public Set<Integer> assignablePartitions(Uuid topicId) { - TopicMetadata topic = this.topicMetadata.get(topicId); + TopicImage topic = metadataImage.topics().getTopic(topicId); if (topic == null) { return Set.of(); } - if (topicPartitionAllowedMap == null) { - return IntStream.range(0, topic.numPartitions()).boxed().collect(Collectors.toUnmodifiableSet()); + if (topicPartitionAllowedMap.isEmpty()) { + return Set.copyOf(topic.partitions().keySet()); Review Comment: As @dajac mentioned, maybe we can make `TopicImage` create immutable collection as it is never modified once built. ```java public TopicImage(String name, Uuid id, Map<Integer, PartitionRegistration> partitions) { this.name = name; this.id = id; this.partitions = Collections.unmodifiableMap(partitions); } ``` with this change, we can return `topic.partitions().keySet()` directly as it is immutable now. BTW, I have opened https://issues.apache.org/jira/browse/KAFKA-19305 to fix other image classes too -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org