zzbennett commented on code in PR #20061: URL: https://github.com/apache/kafka/pull/20061#discussion_r2179197521
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java: ########## @@ -398,38 +390,35 @@ public static long computeGroupHash(Map<String, Long> topicHashes) { * 5. For each partition, write the partition ID and a sorted list of rack identifiers. * - Rack identifiers are formatted as "<length1><value1><length2><value2>" to prevent issues with simple separators. * - * @param topicName The topic image. - * @param metadataImage The cluster image. + * @param topicName The topic name. + * @param metadataImage The topic metadata. * @return The hash of the topic. */ - public static long computeTopicHash(String topicName, MetadataImage metadataImage) { - TopicImage topicImage = metadataImage.topics().getTopic(topicName); - if (topicImage == null) { + public static long computeTopicHash(String topicName, CoordinatorMetadataImage metadataImage) { + Optional<CoordinatorMetadataImage.TopicMetadata> topicImage = metadataImage.topicMetadata(topicName); + if (topicImage.isEmpty()) { return 0; } + CoordinatorMetadataImage.TopicMetadata topicMetadata = topicImage.get(); + HashStream64 hasher = Hashing.xxh3_64().hashStream(); hasher = hasher .putByte(TOPIC_HASH_MAGIC_BYTE) - .putLong(topicImage.id().getMostSignificantBits()) - .putLong(topicImage.id().getLeastSignificantBits()) - .putString(topicImage.name()) - .putInt(topicImage.partitions().size()); - - ClusterImage clusterImage = metadataImage.cluster(); - List<String> racks = new ArrayList<>(); - for (int i = 0; i < topicImage.partitions().size(); i++) { + .putLong(topicMetadata.id().getMostSignificantBits()) + .putLong(topicMetadata.id().getLeastSignificantBits()) + .putString(topicMetadata.name()) + .putInt(topicMetadata.partitionCount()); + + Map<Integer, List<String>> racks = topicMetadata.partitionRacks(); + for (int i = 0; i < topicMetadata.partitionCount(); i++) { hasher = hasher.putInt(i); - racks.clear(); // Clear the list for reuse - for (int replicaId : topicImage.partitions().get(i).replicas) { - BrokerRegistration broker = clusterImage.broker(replicaId); - if (broker != null) { - broker.rack().ifPresent(racks::add); - } - } + List<String> partitionRacks = new ArrayList<>(racks.get(i)); + // topicMetadata returns an unmodifiable list + Collections.copy(partitionRacks, racks.get(i)); Review Comment: I ran the group coordinator jmh benchmarks locally a couple times and didn't see any statistically significant variations in performance -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org