dajac commented on code in PR #19611:
URL: https://github.com/apache/kafka/pull/19611#discussion_r2094164148


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilderTest.java:
##########
@@ -61,15 +63,12 @@ public class OptimizedUniformAssignmentBuilderTest {
 
     @Test
     public void testOneMemberNoTopicSubscription() {
+        MetadataImage metadataImage = new MetadataImageBuilder()
+            .addTopic(topic1Uuid, topic1Name, 3)
+            .addRacks()

Review Comment:
   nit: Is `addRacks` necessary here and in all the other places?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/SubscribedTopicDescriberImpl.java:
##########
@@ -79,49 +80,65 @@ public int numPartitions(Uuid topicId) {
      */
     @Override
     public Set<String> racksForPartition(Uuid topicId, int partition) {
+        TopicImage topic = metadataImage.topics().getTopic(topicId);
+        if (topic != null) {
+            PartitionRegistration partitionRegistration = 
topic.partitions().get(partition);
+            if (partitionRegistration != null) {
+                Set<String> racks = new HashSet<>();
+                for (int replica : partitionRegistration.replicas) {
+                    // Only add the rack if it is available for the 
broker/replica.
+                    
metadataImage.cluster().broker(replica).rack().ifPresent(racks::add);
+                }
+                return racks;
+            }
+        }
         return Set.of();
     }
 
     /**
-     * Returns a set of assignable partitions from the topic metadata.
-     * If the allowed partition map is null, all the partitions in the 
corresponding
-     * topic metadata are returned for the argument topic id. If allowed map 
is empty,
+     * Returns a set of assignable partitions from the metadata image.
+     * If the allowed partition map is Optional.empty(), all the partitions in 
the corresponding
+     * topic image are returned for the argument topic id. If allowed map is 
empty,
      * empty set is returned.
      *
      * @param topicId The uuid of the topic
      * @return Set of integers if assignable partitions available, empty 
otherwise.
      */
     @Override
     public Set<Integer> assignablePartitions(Uuid topicId) {
-        TopicMetadata topic = this.topicMetadata.get(topicId);
+        TopicImage topic = metadataImage.topics().getTopic(topicId);
         if (topic == null) {
             return Set.of();
         }
 
-        if (topicPartitionAllowedMap == null) {
-            return IntStream.range(0, 
topic.numPartitions()).boxed().collect(Collectors.toUnmodifiableSet());
+        if (topicPartitionAllowedMap.isEmpty()) {
+            return Set.copyOf(topic.partitions().keySet());

Review Comment:
   Do we really need to make a copy of the Set? The image is immutable so we 
could just return a non modifiable version. What do you think?



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilderTest.java:
##########
@@ -245,22 +237,21 @@ public void 
testFirstAssignmentNumMembersGreaterThanTotalNumPartitions() {
 
     @Test
     public void testValidityAndBalanceForLargeSampleSet() {
-        Map<Uuid, TopicMetadata> topicMetadata = new HashMap<>();
-        for (int i = 1; i < 100; i++) {
+        MetadataImageBuilder metadataImageBuilder = new MetadataImageBuilder();
+        Set<Uuid> topicIds = new HashSet<>();
+        IntStream.range(1, 101).forEach(i -> {

Review Comment:
   Is 1 to 101 correct? It may be easier to keep the for loop.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/SubscribedTopicDescriberImpl.java:
##########
@@ -19,41 +19,42 @@
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.coordinator.group.api.assignor.PartitionAssignor;
 import 
org.apache.kafka.coordinator.group.api.assignor.SubscribedTopicDescriber;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.TopicImage;
+import org.apache.kafka.metadata.PartitionRegistration;
 
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.Set;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
 
 /**
  * The subscribed topic metadata class is used by the {@link 
PartitionAssignor} to obtain
  * topic and partition metadata for the topics that the modern group is 
subscribed to.
  */
 public class SubscribedTopicDescriberImpl implements SubscribedTopicDescriber {
     /**
-     * The topic Ids mapped to their corresponding {@link TopicMetadata}
-     * object, which contains topic and partition metadata.
+     * The map of topic Ids to the set of allowed partitions for each topic.
+     * If this is empty, all partitions are allowed.
      */
-    private final Map<Uuid, TopicMetadata> topicMetadata;
-    private final Map<Uuid, Set<Integer>> topicPartitionAllowedMap;
+    private final Optional<Map<Uuid, Set<Integer>>> topicPartitionAllowedMap;
 
-    public SubscribedTopicDescriberImpl(Map<Uuid, TopicMetadata> 
topicMetadata) {
-        this(topicMetadata, null);
-    }
+    /**
+     * The metadata image that contains the latest metadata information.
+     */
+    private final MetadataImage metadataImage;
 
-    public SubscribedTopicDescriberImpl(Map<Uuid, TopicMetadata> 
topicMetadata, Map<Uuid, Set<Integer>> topicPartitionAllowedMap) {
-        this.topicMetadata = Objects.requireNonNull(topicMetadata);
-        this.topicPartitionAllowedMap = topicPartitionAllowedMap;
+    public SubscribedTopicDescriberImpl(MetadataImage metadataImage) {
+        this(metadataImage, Optional.empty());
     }
 
-    /**
-     * Map of topic Ids to topic metadata.
-     *
-     * @return The map of topic Ids to topic metadata.
-     */
-    public Map<Uuid, TopicMetadata> topicMetadata() {
-        return this.topicMetadata;
+    public SubscribedTopicDescriberImpl(
+        MetadataImage metadataImage,
+        Optional<Map<Uuid, Set<Integer>>> topicPartitionAllowedMap
+    ) {
+        this.metadataImage = Objects.requireNonNull(metadataImage);
+        this.topicPartitionAllowedMap = topicPartitionAllowedMap;

Review Comment:
   nit: Should we requireNonNull too?



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/SubscribedTopicMetadataTest.java:
##########
@@ -17,94 +17,111 @@
 package org.apache.kafka.coordinator.group.modern;
 
 import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.MetadataImageBuilder;
+import org.apache.kafka.image.MetadataImage;
 
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
-import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
+import java.util.stream.IntStream;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
 public class SubscribedTopicMetadataTest {
 
-    private Map<Uuid, TopicMetadata> topicMetadataMap;
     private SubscribedTopicDescriberImpl subscribedTopicMetadata;
+    private MetadataImage metadataImage;
+    private final int numPartitions = 5;
 
     @BeforeEach
     public void setUp() {
-        topicMetadataMap = new HashMap<>();
-        for (int i = 0; i < 5; i++) {
+        MetadataImageBuilder metadataImageBuilder = new MetadataImageBuilder();
+        IntStream.range(0, 5).forEach(i -> {

Review Comment:
   nit: Let's keep the for loop here as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to