chia7712 commented on code in PR #19611:
URL: https://github.com/apache/kafka/pull/19611#discussion_r2094571756


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/SubscribedTopicDescriberImpl.java:
##########
@@ -79,49 +80,65 @@ public int numPartitions(Uuid topicId) {
      */
     @Override
     public Set<String> racksForPartition(Uuid topicId, int partition) {
+        TopicImage topic = metadataImage.topics().getTopic(topicId);
+        if (topic != null) {
+            PartitionRegistration partitionRegistration = 
topic.partitions().get(partition);
+            if (partitionRegistration != null) {
+                Set<String> racks = new HashSet<>();
+                for (int replica : partitionRegistration.replicas) {
+                    // Only add the rack if it is available for the 
broker/replica.
+                    
metadataImage.cluster().broker(replica).rack().ifPresent(racks::add);
+                }
+                return racks;
+            }
+        }
         return Set.of();
     }
 
     /**
-     * Returns a set of assignable partitions from the topic metadata.
-     * If the allowed partition map is null, all the partitions in the 
corresponding
-     * topic metadata are returned for the argument topic id. If allowed map 
is empty,
+     * Returns a set of assignable partitions from the metadata image.
+     * If the allowed partition map is Optional.empty(), all the partitions in 
the corresponding
+     * topic image are returned for the argument topic id. If allowed map is 
empty,
      * empty set is returned.
      *
      * @param topicId The uuid of the topic
      * @return Set of integers if assignable partitions available, empty 
otherwise.
      */
     @Override
     public Set<Integer> assignablePartitions(Uuid topicId) {
-        TopicMetadata topic = this.topicMetadata.get(topicId);
+        TopicImage topic = metadataImage.topics().getTopic(topicId);
         if (topic == null) {
             return Set.of();
         }
 
-        if (topicPartitionAllowedMap == null) {
-            return IntStream.range(0, 
topic.numPartitions()).boxed().collect(Collectors.toUnmodifiableSet());
+        if (topicPartitionAllowedMap.isEmpty()) {
+            return Set.copyOf(topic.partitions().keySet());

Review Comment:
   As @dajac mentioned, maybe we can make `TopicImage` create immutable 
collection as it is never modified once built.
   ```java
       public TopicImage(String name,
                         Uuid id,
                         Map<Integer, PartitionRegistration> partitions) {
           this.name = name;
           this.id = id;
           this.partitions = Collections.unmodifiableMap(partitions);
       }
   ```
   
   with this change, we can return `topic.partitions().keySet()` directly as it 
is immutable now.
   
   BTW, I have opened https://issues.apache.org/jira/browse/KAFKA-19305 to fix 
other image classes too



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to