dajac commented on code in PR #19611: URL: https://github.com/apache/kafka/pull/19611#discussion_r2094164148
########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilderTest.java: ########## @@ -61,15 +63,12 @@ public class OptimizedUniformAssignmentBuilderTest { @Test public void testOneMemberNoTopicSubscription() { + MetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(topic1Uuid, topic1Name, 3) + .addRacks() Review Comment: nit: Is `addRacks` necessary here and in all the other places? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/SubscribedTopicDescriberImpl.java: ########## @@ -79,49 +80,65 @@ public int numPartitions(Uuid topicId) { */ @Override public Set<String> racksForPartition(Uuid topicId, int partition) { + TopicImage topic = metadataImage.topics().getTopic(topicId); + if (topic != null) { + PartitionRegistration partitionRegistration = topic.partitions().get(partition); + if (partitionRegistration != null) { + Set<String> racks = new HashSet<>(); + for (int replica : partitionRegistration.replicas) { + // Only add the rack if it is available for the broker/replica. + metadataImage.cluster().broker(replica).rack().ifPresent(racks::add); + } + return racks; + } + } return Set.of(); } /** - * Returns a set of assignable partitions from the topic metadata. - * If the allowed partition map is null, all the partitions in the corresponding - * topic metadata are returned for the argument topic id. If allowed map is empty, + * Returns a set of assignable partitions from the metadata image. + * If the allowed partition map is Optional.empty(), all the partitions in the corresponding + * topic image are returned for the argument topic id. If allowed map is empty, * empty set is returned. * * @param topicId The uuid of the topic * @return Set of integers if assignable partitions available, empty otherwise. */ @Override public Set<Integer> assignablePartitions(Uuid topicId) { - TopicMetadata topic = this.topicMetadata.get(topicId); + TopicImage topic = metadataImage.topics().getTopic(topicId); if (topic == null) { return Set.of(); } - if (topicPartitionAllowedMap == null) { - return IntStream.range(0, topic.numPartitions()).boxed().collect(Collectors.toUnmodifiableSet()); + if (topicPartitionAllowedMap.isEmpty()) { + return Set.copyOf(topic.partitions().keySet()); Review Comment: Do we really need to make a copy of the Set? The image is immutable so we could just return a non modifiable version. What do you think? ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilderTest.java: ########## @@ -245,22 +237,21 @@ public void testFirstAssignmentNumMembersGreaterThanTotalNumPartitions() { @Test public void testValidityAndBalanceForLargeSampleSet() { - Map<Uuid, TopicMetadata> topicMetadata = new HashMap<>(); - for (int i = 1; i < 100; i++) { + MetadataImageBuilder metadataImageBuilder = new MetadataImageBuilder(); + Set<Uuid> topicIds = new HashSet<>(); + IntStream.range(1, 101).forEach(i -> { Review Comment: Is 1 to 101 correct? It may be easier to keep the for loop. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/SubscribedTopicDescriberImpl.java: ########## @@ -19,41 +19,42 @@ import org.apache.kafka.common.Uuid; import org.apache.kafka.coordinator.group.api.assignor.PartitionAssignor; import org.apache.kafka.coordinator.group.api.assignor.SubscribedTopicDescriber; +import org.apache.kafka.image.MetadataImage; +import org.apache.kafka.image.TopicImage; +import org.apache.kafka.metadata.PartitionRegistration; +import java.util.HashSet; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.Set; -import java.util.stream.Collectors; -import java.util.stream.IntStream; /** * The subscribed topic metadata class is used by the {@link PartitionAssignor} to obtain * topic and partition metadata for the topics that the modern group is subscribed to. */ public class SubscribedTopicDescriberImpl implements SubscribedTopicDescriber { /** - * The topic Ids mapped to their corresponding {@link TopicMetadata} - * object, which contains topic and partition metadata. + * The map of topic Ids to the set of allowed partitions for each topic. + * If this is empty, all partitions are allowed. */ - private final Map<Uuid, TopicMetadata> topicMetadata; - private final Map<Uuid, Set<Integer>> topicPartitionAllowedMap; + private final Optional<Map<Uuid, Set<Integer>>> topicPartitionAllowedMap; - public SubscribedTopicDescriberImpl(Map<Uuid, TopicMetadata> topicMetadata) { - this(topicMetadata, null); - } + /** + * The metadata image that contains the latest metadata information. + */ + private final MetadataImage metadataImage; - public SubscribedTopicDescriberImpl(Map<Uuid, TopicMetadata> topicMetadata, Map<Uuid, Set<Integer>> topicPartitionAllowedMap) { - this.topicMetadata = Objects.requireNonNull(topicMetadata); - this.topicPartitionAllowedMap = topicPartitionAllowedMap; + public SubscribedTopicDescriberImpl(MetadataImage metadataImage) { + this(metadataImage, Optional.empty()); } - /** - * Map of topic Ids to topic metadata. - * - * @return The map of topic Ids to topic metadata. - */ - public Map<Uuid, TopicMetadata> topicMetadata() { - return this.topicMetadata; + public SubscribedTopicDescriberImpl( + MetadataImage metadataImage, + Optional<Map<Uuid, Set<Integer>>> topicPartitionAllowedMap + ) { + this.metadataImage = Objects.requireNonNull(metadataImage); + this.topicPartitionAllowedMap = topicPartitionAllowedMap; Review Comment: nit: Should we requireNonNull too? ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/SubscribedTopicMetadataTest.java: ########## @@ -17,94 +17,111 @@ package org.apache.kafka.coordinator.group.modern; import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.MetadataImageBuilder; +import org.apache.kafka.image.MetadataImage; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import java.util.HashMap; import java.util.Map; +import java.util.Optional; import java.util.Set; +import java.util.stream.IntStream; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertThrows; public class SubscribedTopicMetadataTest { - private Map<Uuid, TopicMetadata> topicMetadataMap; private SubscribedTopicDescriberImpl subscribedTopicMetadata; + private MetadataImage metadataImage; + private final int numPartitions = 5; @BeforeEach public void setUp() { - topicMetadataMap = new HashMap<>(); - for (int i = 0; i < 5; i++) { + MetadataImageBuilder metadataImageBuilder = new MetadataImageBuilder(); + IntStream.range(0, 5).forEach(i -> { Review Comment: nit: Let's keep the for loop here as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org