zzbennett commented on code in PR #20061:
URL: https://github.com/apache/kafka/pull/20061#discussion_r2179197521


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java:
##########
@@ -398,38 +390,35 @@ public static long computeGroupHash(Map<String, Long> 
topicHashes) {
      * 5. For each partition, write the partition ID and a sorted list of rack 
identifiers.
      * - Rack identifiers are formatted as 
"<length1><value1><length2><value2>" to prevent issues with simple separators.
      *
-     * @param topicName     The topic image.
-     * @param metadataImage The cluster image.
+     * @param topicName The topic name.
+     * @param metadataImage The topic metadata.
      * @return The hash of the topic.
      */
-    public static long computeTopicHash(String topicName, MetadataImage 
metadataImage) {
-        TopicImage topicImage = metadataImage.topics().getTopic(topicName);
-        if (topicImage == null) {
+    public static long computeTopicHash(String topicName, 
CoordinatorMetadataImage metadataImage) {
+        Optional<CoordinatorMetadataImage.TopicMetadata> topicImage = 
metadataImage.topicMetadata(topicName);
+        if (topicImage.isEmpty()) {
             return 0;
         }
 
+        CoordinatorMetadataImage.TopicMetadata topicMetadata = 
topicImage.get();
+
         HashStream64 hasher = Hashing.xxh3_64().hashStream();
         hasher = hasher
             .putByte(TOPIC_HASH_MAGIC_BYTE)
-            .putLong(topicImage.id().getMostSignificantBits())
-            .putLong(topicImage.id().getLeastSignificantBits())
-            .putString(topicImage.name())
-            .putInt(topicImage.partitions().size());
-
-        ClusterImage clusterImage = metadataImage.cluster();
-        List<String> racks = new ArrayList<>();
-        for (int i = 0; i < topicImage.partitions().size(); i++) {
+            .putLong(topicMetadata.id().getMostSignificantBits())
+            .putLong(topicMetadata.id().getLeastSignificantBits())
+            .putString(topicMetadata.name())
+            .putInt(topicMetadata.partitionCount());
+
+        Map<Integer, List<String>> racks = topicMetadata.partitionRacks();
+        for (int i = 0; i < topicMetadata.partitionCount(); i++) {
             hasher = hasher.putInt(i);
-            racks.clear(); // Clear the list for reuse
-            for (int replicaId : topicImage.partitions().get(i).replicas) {
-                BrokerRegistration broker = clusterImage.broker(replicaId);
-                if (broker != null) {
-                    broker.rack().ifPresent(racks::add);
-                }
-            }
+            List<String> partitionRacks = new ArrayList<>(racks.get(i));
+            // topicMetadata returns an unmodifiable list
+            Collections.copy(partitionRacks, racks.get(i));

Review Comment:
   I ran the group coordinator jmh benchmarks locally a couple times and didn't 
see any statistically significant variations in performance



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to