zzbennett commented on code in PR #20061:
URL: https://github.com/apache/kafka/pull/20061#discussion_r2174712852


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -1921,14 +1921,14 @@ private 
CompletableFuture<DeleteShareGroupOffsetsResponseData> handleDeleteShare
                 errorTopicResponses.add(
                     new 
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
                         .setTopicId(topicData.topicId())
-                        
.setTopicName(metadataImage.topics().topicIdToNameView().get(topicData.topicId()))
+                        
.setTopicName(metadataImage.topicName(topicData.topicId()).orElse("<UNKNOWN>"))
                         
.setErrorMessage(Errors.forCode(errItem.get().errorCode()).message())
                         .setErrorCode(errItem.get().errorCode())
                 );
             } else {
                 successTopics.put(
                     topicData.topicId(),
-                    
metadataImage.topics().topicIdToNameView().get(topicData.topicId())
+                    
metadataImage.topicName(topicData.topicId()).orElse("<UNKNOWN>")

Review Comment:
   this is setting the topic name here so I'm not sure if ZERO_UUID makes sense



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java:
##########
@@ -398,38 +390,35 @@ public static long computeGroupHash(Map<String, Long> 
topicHashes) {
      * 5. For each partition, write the partition ID and a sorted list of rack 
identifiers.
      * - Rack identifiers are formatted as 
"<length1><value1><length2><value2>" to prevent issues with simple separators.
      *
-     * @param topicName     The topic image.
-     * @param metadataImage The cluster image.
+     * @param topicName The topic name.
+     * @param metadataImage The topic metadata.
      * @return The hash of the topic.
      */
-    public static long computeTopicHash(String topicName, MetadataImage 
metadataImage) {
-        TopicImage topicImage = metadataImage.topics().getTopic(topicName);
-        if (topicImage == null) {
+    public static long computeTopicHash(String topicName, 
CoordinatorMetadataImage metadataImage) {
+        Optional<CoordinatorMetadataImage.TopicMetadata> topicImage = 
metadataImage.topicMetadata(topicName);
+        if (topicImage.isEmpty()) {
             return 0;
         }
 
+        CoordinatorMetadataImage.TopicMetadata topicMetadata = 
topicImage.get();
+
         HashStream64 hasher = Hashing.xxh3_64().hashStream();
         hasher = hasher
             .putByte(TOPIC_HASH_MAGIC_BYTE)
-            .putLong(topicImage.id().getMostSignificantBits())
-            .putLong(topicImage.id().getLeastSignificantBits())
-            .putString(topicImage.name())
-            .putInt(topicImage.partitions().size());
-
-        ClusterImage clusterImage = metadataImage.cluster();
-        List<String> racks = new ArrayList<>();
-        for (int i = 0; i < topicImage.partitions().size(); i++) {
+            .putLong(topicMetadata.id().getMostSignificantBits())
+            .putLong(topicMetadata.id().getLeastSignificantBits())
+            .putString(topicMetadata.name())
+            .putInt(topicMetadata.partitionCount());
+
+        Map<Integer, List<String>> racks = topicMetadata.partitionRacks();
+        for (int i = 0; i < topicMetadata.partitionCount(); i++) {
             hasher = hasher.putInt(i);
-            racks.clear(); // Clear the list for reuse
-            for (int replicaId : topicImage.partitions().get(i).replicas) {
-                BrokerRegistration broker = clusterImage.broker(replicaId);
-                if (broker != null) {
-                    broker.rack().ifPresent(racks::add);
-                }
-            }
+            List<String> partitionRacks = new ArrayList<>(racks.get(i));
+            // topicMetadata returns an unmodifiable list
+            Collections.copy(partitionRacks, racks.get(i));

Review Comment:
   Sure I can make it a modifiable list. Making it unmodifiable is probably 
overkill since, like you said, we already make a copy in the `partitionRacks` 
method. If we avoid the extra copy, then the new implementation should be 
equivalent to the old but I'll double check with the jmh benchmarks



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java:
##########
@@ -398,38 +390,35 @@ public static long computeGroupHash(Map<String, Long> 
topicHashes) {
      * 5. For each partition, write the partition ID and a sorted list of rack 
identifiers.
      * - Rack identifiers are formatted as 
"<length1><value1><length2><value2>" to prevent issues with simple separators.
      *
-     * @param topicName     The topic image.
-     * @param metadataImage The cluster image.
+     * @param topicName The topic name.
+     * @param metadataImage The topic metadata.
      * @return The hash of the topic.
      */
-    public static long computeTopicHash(String topicName, MetadataImage 
metadataImage) {
-        TopicImage topicImage = metadataImage.topics().getTopic(topicName);
-        if (topicImage == null) {
+    public static long computeTopicHash(String topicName, 
CoordinatorMetadataImage metadataImage) {
+        Optional<CoordinatorMetadataImage.TopicMetadata> topicImage = 
metadataImage.topicMetadata(topicName);
+        if (topicImage.isEmpty()) {
             return 0;
         }
 
+        CoordinatorMetadataImage.TopicMetadata topicMetadata = 
topicImage.get();
+
         HashStream64 hasher = Hashing.xxh3_64().hashStream();
         hasher = hasher
             .putByte(TOPIC_HASH_MAGIC_BYTE)
-            .putLong(topicImage.id().getMostSignificantBits())
-            .putLong(topicImage.id().getLeastSignificantBits())
-            .putString(topicImage.name())
-            .putInt(topicImage.partitions().size());
-
-        ClusterImage clusterImage = metadataImage.cluster();
-        List<String> racks = new ArrayList<>();
-        for (int i = 0; i < topicImage.partitions().size(); i++) {
+            .putLong(topicMetadata.id().getMostSignificantBits())
+            .putLong(topicMetadata.id().getLeastSignificantBits())
+            .putString(topicMetadata.name())
+            .putInt(topicMetadata.partitionCount());
+
+        Map<Integer, List<String>> racks = topicMetadata.partitionRacks();

Review Comment:
   Makes sense, that does make it a bit cleaner



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/SubscribedTopicDescriberImpl.java:
##########
@@ -66,22 +64,18 @@ public int numPartitions(Uuid topicId) {
      */
     @Override
     public Set<String> racksForPartition(Uuid topicId, int partition) {
-        TopicImage topic = metadataImage.topics().getTopic(topicId);
-        if (topic != null) {
-            PartitionRegistration partitionRegistration = 
topic.partitions().get(partition);
-            if (partitionRegistration != null) {
-                Set<String> racks = new HashSet<>();
-                for (int replica : partitionRegistration.replicas) {
-                    // Only add the rack if it is available for the 
broker/replica.
-                    BrokerRegistration brokerRegistration = 
metadataImage.cluster().broker(replica);
-                    if (brokerRegistration != null) {
-                        brokerRegistration.rack().ifPresent(racks::add);
-                    }
-                }
-                return Collections.unmodifiableSet(racks);
-            }
+        Optional<CoordinatorMetadataImage.TopicMetadata> topicMetadataOp = 
metadataImage.topicMetadata(topicId);
+        if (topicMetadataOp.isEmpty()) {
+            return Collections.emptySet();
+        }
+
+        CoordinatorMetadataImage.TopicMetadata topicMetadata = 
topicMetadataOp.get();
+        List<String> racks = topicMetadata.partitionRacks().get(partition);

Review Comment:
   yea agreed...



##########
coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorMetadataImage.java:
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.common.runtime;
+
+import org.apache.kafka.common.Uuid;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * Provides metadata to Coordinators (GroupCoordinator, ShareCoordinator, etc) 
such as topics, partitions, and their configurations.
+ * Implementations should be thread-safe and immutable.
+ */
+public interface CoordinatorMetadataImage {
+    CoordinatorMetadataImage EMPTY = emptyImage();
+
+    Optional<String> topicName(Uuid id);
+
+    Optional<Uuid> topicId(String topicName);
+
+    default Optional<Integer> partitionCount(Uuid topicId) {
+        var topicName = topicName(topicId);
+        return topicName.isEmpty() ? Optional.empty() : 
partitionCount(topicName.get());
+    }
+
+    Optional<Integer> partitionCount(String topicName);
+
+    Set<Uuid> topicIds();
+
+    Set<String> topicNames();
+
+    default Optional<TopicMetadata> topicMetadata(String topicName) {
+        var topicId = topicId(topicName);
+        return topicId.isEmpty() ? Optional.empty() : 
topicMetadata(topicId.get());
+    }
+
+    Optional<TopicMetadata> topicMetadata(Uuid topicId);
+
+    boolean shareGroupsEnabled();

Review Comment:
   Yea it is weird. The share group coordinator relies on it in its 
`onNewMetadataImage` method 
https://github.com/apache/kafka/blob/trunk/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java#L1102
 
   
   I just realized though that this particular instance of `onNewMetadataImage` 
is not part of any of the other coordinator related interfaces though, so I can 
easily just add the FeatureImage as an extra arg to the `ShareCoordinator` 
interface instead of handling it through the `CoordinatorMetadataImage`.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -477,7 +482,7 @@ public void testMemberJoinsEmptyConsumerGroup() {
         MockPartitionAssignor assignor = new MockPartitionAssignor("range");
         GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
             
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, 
List.of(assignor))
-            .withMetadataImage(metadataImage)
+            .withMetadataImage(new 
KRaftCoordinatorMetadataImage(metadataImage))

Review Comment:
   I actually did at some point add a method to the MetadataImageBuilder to 
return a CoordinatorMetadataImage but I must have given up updating the test to 
use the new method in all the places where it could. This test is really 
massive 😅 



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -5725,24 +5717,23 @@ public void onNewMetadataImage(MetadataImage newImage, 
MetadataDelta delta) {
         // the subscription metadata (and the assignment) via the above 
mechanism. The
         // resolved regular expressions are cleaned up on the next refresh.
         if (!topicsDelta.createdTopicIds().isEmpty()) {
-            lastMetadataImageWithNewTopics = 
metadataImage.provenance().lastContainedOffset();
+            lastMetadataImageWithNewTopics = metadataImage.version();
         }
 
         // Notify all the groups subscribed to the created, updated or
         // deleted topics.
         Set<String> allGroupIds = new HashSet<>();
-        topicsDelta.changedTopics().forEach((topicId, topicDelta) -> {
-            String topicName = topicDelta.name();
-            // Remove topic hash from the cache to recalculate it.
-            topicHashCache.remove(topicName);
-            allGroupIds.addAll(groupsSubscribedToTopic(topicName));
-        });
-        topicsDelta.deletedTopicIds().forEach(topicId -> {
-            TopicImage topicImage = delta.image().topics().getTopic(topicId);
-            String topicName = topicImage.name();
-            topicHashCache.remove(topicName);
-            allGroupIds.addAll(groupsSubscribedToTopic(topicName));
-        });
+        topicsDelta.changedTopicIds().forEach(topicId ->
+            metadataImage.topicName(topicId).ifPresent(topicName -> {
+                // Remove topic hash from the cache to recalculate it.
+                topicHashCache.remove(topicName);
+                allGroupIds.addAll(groupsSubscribedToTopic(topicName));
+            }));
+        topicsDelta.deletedTopicIds().forEach(topicId ->
+            delta.image().topicName(topicId).ifPresent(topicName -> {

Review Comment:
   Hm not sure. That is how the code was before but it makes more sense to use 
the `metadataImage`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to