zzbennett commented on code in PR #20061: URL: https://github.com/apache/kafka/pull/20061#discussion_r2174712852
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ########## @@ -1921,14 +1921,14 @@ private CompletableFuture<DeleteShareGroupOffsetsResponseData> handleDeleteShare errorTopicResponses.add( new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic() .setTopicId(topicData.topicId()) - .setTopicName(metadataImage.topics().topicIdToNameView().get(topicData.topicId())) + .setTopicName(metadataImage.topicName(topicData.topicId()).orElse("<UNKNOWN>")) .setErrorMessage(Errors.forCode(errItem.get().errorCode()).message()) .setErrorCode(errItem.get().errorCode()) ); } else { successTopics.put( topicData.topicId(), - metadataImage.topics().topicIdToNameView().get(topicData.topicId()) + metadataImage.topicName(topicData.topicId()).orElse("<UNKNOWN>") Review Comment: this is setting the topic name here so I'm not sure if ZERO_UUID makes sense ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java: ########## @@ -398,38 +390,35 @@ public static long computeGroupHash(Map<String, Long> topicHashes) { * 5. For each partition, write the partition ID and a sorted list of rack identifiers. * - Rack identifiers are formatted as "<length1><value1><length2><value2>" to prevent issues with simple separators. * - * @param topicName The topic image. - * @param metadataImage The cluster image. + * @param topicName The topic name. + * @param metadataImage The topic metadata. * @return The hash of the topic. */ - public static long computeTopicHash(String topicName, MetadataImage metadataImage) { - TopicImage topicImage = metadataImage.topics().getTopic(topicName); - if (topicImage == null) { + public static long computeTopicHash(String topicName, CoordinatorMetadataImage metadataImage) { + Optional<CoordinatorMetadataImage.TopicMetadata> topicImage = metadataImage.topicMetadata(topicName); + if (topicImage.isEmpty()) { return 0; } + CoordinatorMetadataImage.TopicMetadata topicMetadata = topicImage.get(); + HashStream64 hasher = Hashing.xxh3_64().hashStream(); hasher = hasher .putByte(TOPIC_HASH_MAGIC_BYTE) - .putLong(topicImage.id().getMostSignificantBits()) - .putLong(topicImage.id().getLeastSignificantBits()) - .putString(topicImage.name()) - .putInt(topicImage.partitions().size()); - - ClusterImage clusterImage = metadataImage.cluster(); - List<String> racks = new ArrayList<>(); - for (int i = 0; i < topicImage.partitions().size(); i++) { + .putLong(topicMetadata.id().getMostSignificantBits()) + .putLong(topicMetadata.id().getLeastSignificantBits()) + .putString(topicMetadata.name()) + .putInt(topicMetadata.partitionCount()); + + Map<Integer, List<String>> racks = topicMetadata.partitionRacks(); + for (int i = 0; i < topicMetadata.partitionCount(); i++) { hasher = hasher.putInt(i); - racks.clear(); // Clear the list for reuse - for (int replicaId : topicImage.partitions().get(i).replicas) { - BrokerRegistration broker = clusterImage.broker(replicaId); - if (broker != null) { - broker.rack().ifPresent(racks::add); - } - } + List<String> partitionRacks = new ArrayList<>(racks.get(i)); + // topicMetadata returns an unmodifiable list + Collections.copy(partitionRacks, racks.get(i)); Review Comment: Sure I can make it a modifiable list. Making it unmodifiable is probably overkill since, like you said, we already make a copy in the `partitionRacks` method. If we avoid the extra copy, then the new implementation should be equivalent to the old but I'll double check with the jmh benchmarks ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java: ########## @@ -398,38 +390,35 @@ public static long computeGroupHash(Map<String, Long> topicHashes) { * 5. For each partition, write the partition ID and a sorted list of rack identifiers. * - Rack identifiers are formatted as "<length1><value1><length2><value2>" to prevent issues with simple separators. * - * @param topicName The topic image. - * @param metadataImage The cluster image. + * @param topicName The topic name. + * @param metadataImage The topic metadata. * @return The hash of the topic. */ - public static long computeTopicHash(String topicName, MetadataImage metadataImage) { - TopicImage topicImage = metadataImage.topics().getTopic(topicName); - if (topicImage == null) { + public static long computeTopicHash(String topicName, CoordinatorMetadataImage metadataImage) { + Optional<CoordinatorMetadataImage.TopicMetadata> topicImage = metadataImage.topicMetadata(topicName); + if (topicImage.isEmpty()) { return 0; } + CoordinatorMetadataImage.TopicMetadata topicMetadata = topicImage.get(); + HashStream64 hasher = Hashing.xxh3_64().hashStream(); hasher = hasher .putByte(TOPIC_HASH_MAGIC_BYTE) - .putLong(topicImage.id().getMostSignificantBits()) - .putLong(topicImage.id().getLeastSignificantBits()) - .putString(topicImage.name()) - .putInt(topicImage.partitions().size()); - - ClusterImage clusterImage = metadataImage.cluster(); - List<String> racks = new ArrayList<>(); - for (int i = 0; i < topicImage.partitions().size(); i++) { + .putLong(topicMetadata.id().getMostSignificantBits()) + .putLong(topicMetadata.id().getLeastSignificantBits()) + .putString(topicMetadata.name()) + .putInt(topicMetadata.partitionCount()); + + Map<Integer, List<String>> racks = topicMetadata.partitionRacks(); Review Comment: Makes sense, that does make it a bit cleaner ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/SubscribedTopicDescriberImpl.java: ########## @@ -66,22 +64,18 @@ public int numPartitions(Uuid topicId) { */ @Override public Set<String> racksForPartition(Uuid topicId, int partition) { - TopicImage topic = metadataImage.topics().getTopic(topicId); - if (topic != null) { - PartitionRegistration partitionRegistration = topic.partitions().get(partition); - if (partitionRegistration != null) { - Set<String> racks = new HashSet<>(); - for (int replica : partitionRegistration.replicas) { - // Only add the rack if it is available for the broker/replica. - BrokerRegistration brokerRegistration = metadataImage.cluster().broker(replica); - if (brokerRegistration != null) { - brokerRegistration.rack().ifPresent(racks::add); - } - } - return Collections.unmodifiableSet(racks); - } + Optional<CoordinatorMetadataImage.TopicMetadata> topicMetadataOp = metadataImage.topicMetadata(topicId); + if (topicMetadataOp.isEmpty()) { + return Collections.emptySet(); + } + + CoordinatorMetadataImage.TopicMetadata topicMetadata = topicMetadataOp.get(); + List<String> racks = topicMetadata.partitionRacks().get(partition); Review Comment: yea agreed... ########## coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorMetadataImage.java: ########## @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.common.runtime; + +import org.apache.kafka.common.Uuid; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * Provides metadata to Coordinators (GroupCoordinator, ShareCoordinator, etc) such as topics, partitions, and their configurations. + * Implementations should be thread-safe and immutable. + */ +public interface CoordinatorMetadataImage { + CoordinatorMetadataImage EMPTY = emptyImage(); + + Optional<String> topicName(Uuid id); + + Optional<Uuid> topicId(String topicName); + + default Optional<Integer> partitionCount(Uuid topicId) { + var topicName = topicName(topicId); + return topicName.isEmpty() ? Optional.empty() : partitionCount(topicName.get()); + } + + Optional<Integer> partitionCount(String topicName); + + Set<Uuid> topicIds(); + + Set<String> topicNames(); + + default Optional<TopicMetadata> topicMetadata(String topicName) { + var topicId = topicId(topicName); + return topicId.isEmpty() ? Optional.empty() : topicMetadata(topicId.get()); + } + + Optional<TopicMetadata> topicMetadata(Uuid topicId); + + boolean shareGroupsEnabled(); Review Comment: Yea it is weird. The share group coordinator relies on it in its `onNewMetadataImage` method https://github.com/apache/kafka/blob/trunk/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java#L1102 I just realized though that this particular instance of `onNewMetadataImage` is not part of any of the other coordinator related interfaces though, so I can easily just add the FeatureImage as an extra arg to the `ShareCoordinator` interface instead of handling it through the `CoordinatorMetadataImage`. ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ########## @@ -477,7 +482,7 @@ public void testMemberJoinsEmptyConsumerGroup() { MockPartitionAssignor assignor = new MockPartitionAssignor("range"); GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() .withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(assignor)) - .withMetadataImage(metadataImage) + .withMetadataImage(new KRaftCoordinatorMetadataImage(metadataImage)) Review Comment: I actually did at some point add a method to the MetadataImageBuilder to return a CoordinatorMetadataImage but I must have given up updating the test to use the new method in all the places where it could. This test is really massive 😅 ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -5725,24 +5717,23 @@ public void onNewMetadataImage(MetadataImage newImage, MetadataDelta delta) { // the subscription metadata (and the assignment) via the above mechanism. The // resolved regular expressions are cleaned up on the next refresh. if (!topicsDelta.createdTopicIds().isEmpty()) { - lastMetadataImageWithNewTopics = metadataImage.provenance().lastContainedOffset(); + lastMetadataImageWithNewTopics = metadataImage.version(); } // Notify all the groups subscribed to the created, updated or // deleted topics. Set<String> allGroupIds = new HashSet<>(); - topicsDelta.changedTopics().forEach((topicId, topicDelta) -> { - String topicName = topicDelta.name(); - // Remove topic hash from the cache to recalculate it. - topicHashCache.remove(topicName); - allGroupIds.addAll(groupsSubscribedToTopic(topicName)); - }); - topicsDelta.deletedTopicIds().forEach(topicId -> { - TopicImage topicImage = delta.image().topics().getTopic(topicId); - String topicName = topicImage.name(); - topicHashCache.remove(topicName); - allGroupIds.addAll(groupsSubscribedToTopic(topicName)); - }); + topicsDelta.changedTopicIds().forEach(topicId -> + metadataImage.topicName(topicId).ifPresent(topicName -> { + // Remove topic hash from the cache to recalculate it. + topicHashCache.remove(topicName); + allGroupIds.addAll(groupsSubscribedToTopic(topicName)); + })); + topicsDelta.deletedTopicIds().forEach(topicId -> + delta.image().topicName(topicId).ifPresent(topicName -> { Review Comment: Hm not sure. That is how the code was before but it makes more sense to use the `metadataImage` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org