chia7712 commented on code in PR #19232: URL: https://github.com/apache/kafka/pull/19232#discussion_r2095357396
########## metadata/src/main/java/org/apache/kafka/metadata/KRaftMetadataCache.java: ########## @@ -0,0 +1,513 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.metadata; + +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.errors.InvalidTopicException; +import org.apache.kafka.common.internals.Topic; +import org.apache.kafka.common.message.DescribeClientQuotasRequestData; +import org.apache.kafka.common.message.DescribeClientQuotasResponseData; +import org.apache.kafka.common.message.DescribeTopicPartitionsResponseData; +import org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.Cursor; +import org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponsePartition; +import org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponseTopic; +import org.apache.kafka.common.message.DescribeUserScramCredentialsRequestData; +import org.apache.kafka.common.message.DescribeUserScramCredentialsResponseData; +import org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition; +import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic; +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.image.MetadataImage; +import org.apache.kafka.image.TopicImage; +import org.apache.kafka.server.common.FinalizedFeatures; +import org.apache.kafka.server.common.KRaftVersion; +import org.apache.kafka.server.common.MetadataVersion; + +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; +import java.util.function.Predicate; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public class KRaftMetadataCache implements MetadataCache { + private final Logger log; + private final Supplier<KRaftVersion> kraftVersionSupplier; + + // This is the cache state. Every MetadataImage instance is immutable, and updates + // replace this value with a completely new one. This means reads (which are not under + // any lock) need to grab the value of this variable once, and retain that read copy for + // the duration of their operation. Multiple reads of this value risk getting different + // image values. + private volatile MetadataImage currentImage = MetadataImage.EMPTY; + + public KRaftMetadataCache(int brokerId, Supplier<KRaftVersion> kraftVersionSupplier) { + this.kraftVersionSupplier = kraftVersionSupplier; + this.log = new LogContext("[MetadataCache brokerId=" + brokerId + "] ").logger(KRaftMetadataCache.class); + } + + /** + * Filter the alive replicas. It returns all brokers when filterUnavailableEndpoints is false. + * Otherwise, it filters the brokers that are fenced or do not have the listener. + * <p> + * This method is the main hotspot when it comes to the performance of metadata requests, + * we should be careful about adding additional logic here. + * @param image The metadata image. + * @param brokers The list of brokers to filter. + * @param listenerName The listener name. + * @param filterUnavailableEndpoints Whether to filter the unavailable endpoints. This field is to support v0 MetadataResponse. + */ + private List<Integer> maybeFilterAliveReplicas( + MetadataImage image, + int[] brokers, + ListenerName listenerName, + boolean filterUnavailableEndpoints + ) { + if (!filterUnavailableEndpoints) return Replicas.toList(brokers); + List<Integer> res = new ArrayList<>(brokers.length); + for (int brokerId : brokers) { + BrokerRegistration broker = image.cluster().broker(brokerId); + if (broker != null && !broker.fenced() && broker.listeners().containsKey(listenerName.value())) { + res.add(brokerId); + } + } + return res; + } + + public MetadataImage currentImage() { + return currentImage; + } + + /** + * Get the partition metadata for the given topic and listener. If errorUnavailableEndpoints is true, + * it uses all brokers in the partitions. Otherwise, it filters the unavailable endpoints. + * If errorUnavailableListeners is true, it returns LISTENER_NOT_FOUND if the listener is missing on the broker. + * Otherwise, it returns LEADER_NOT_AVAILABLE for broker unavailable. + * + * @param image The metadata image. + * @param topicName The name of the topic. + * @param listenerName The listener name. + * @param errorUnavailableEndpoints Whether to filter the unavailable endpoints. This field is to support v0 MetadataResponse. + * @param errorUnavailableListeners Whether to return LISTENER_NOT_FOUND or LEADER_NOT_AVAILABLE. + */ + private List<MetadataResponsePartition> getPartitionMetadata( + MetadataImage image, + String topicName, + ListenerName listenerName, + boolean errorUnavailableEndpoints, + boolean errorUnavailableListeners + ) { + TopicImage topicImage = image.topics().getTopic(topicName); + if (topicImage == null) return List.of(); + return topicImage.partitions().entrySet().stream().map(entry -> { + int partitionId = entry.getKey(); + PartitionRegistration partition = entry.getValue(); + List<Integer> filteredReplicas = maybeFilterAliveReplicas(image, partition.replicas, listenerName, errorUnavailableEndpoints); + List<Integer> filteredIsr = maybeFilterAliveReplicas(image, partition.isr, listenerName, errorUnavailableEndpoints); + List<Integer> offlineReplicas = getOfflineReplicas(image, partition, listenerName); + Optional<Node> maybeLeader = getAliveEndpoint(image, partition.leader, listenerName); + Errors error; + if (maybeLeader.isEmpty()) { + if (!image.cluster().brokers().containsKey(partition.leader)) { + log.debug("Error while fetching metadata for {}-{}: leader not available", topicName, partitionId); + error = Errors.LEADER_NOT_AVAILABLE; + } else { + log.debug("Error while fetching metadata for {}-{}: listener {} not found on leader {}", topicName, partitionId, listenerName, partition.leader); + error = errorUnavailableListeners ? Errors.LISTENER_NOT_FOUND : Errors.LEADER_NOT_AVAILABLE; + } + return new MetadataResponsePartition() + .setErrorCode(error.code()) + .setPartitionIndex(partitionId) + .setLeaderId(MetadataResponse.NO_LEADER_ID) + .setLeaderEpoch(partition.leaderEpoch) + .setReplicaNodes(filteredReplicas) + .setIsrNodes(filteredIsr) + .setOfflineReplicas(offlineReplicas); + } else { + if (filteredReplicas.size() < partition.replicas.length) { + log.debug("Error while fetching metadata for {}-{}: replica information not available for following brokers {}", topicName, partitionId, Arrays.stream(partition.replicas).filter(b -> !filteredReplicas.contains(b)).mapToObj(String::valueOf).collect(Collectors.joining(","))); + error = Errors.REPLICA_NOT_AVAILABLE; + } else if (filteredIsr.size() < partition.isr.length) { + log.debug("Error while fetching metadata for {}-{}: in sync replica information not available for following brokers {}", topicName, partitionId, Arrays.stream(partition.isr).filter(b -> !filteredIsr.contains(b)).mapToObj(String::valueOf).collect(Collectors.joining(","))); + error = Errors.REPLICA_NOT_AVAILABLE; + } else { + error = Errors.NONE; + } + return new MetadataResponsePartition() + .setErrorCode(error.code()) + .setPartitionIndex(partitionId) + .setLeaderId(maybeLeader.get().id()) + .setLeaderEpoch(partition.leaderEpoch) + .setReplicaNodes(filteredReplicas) + .setIsrNodes(filteredIsr) + .setOfflineReplicas(offlineReplicas); + } + }).toList(); + } + + /** + * Return topic partition metadata for the given topic, listener and index range. Also, return the next partition + * index that is not included in the result. + * + * @param image The metadata image + * @param topicName The name of the topic. + * @param listenerName The listener name. + * @param startIndex The smallest index of the partitions to be included in the result. + * + * @return A collection of topic partition metadata and next partition index (-1 means + * no next partition). + */ + private Entry<Optional<List<DescribeTopicPartitionsResponsePartition>>, Integer> getPartitionMetadataForDescribeTopicResponse( Review Comment: `getPartitionMetadataForDescribeTopicResponse` -> `partitionMetadataForDescribeTopicResponse` ########## metadata/src/main/java/org/apache/kafka/metadata/KRaftMetadataCache.java: ########## @@ -0,0 +1,513 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.metadata; + +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.errors.InvalidTopicException; +import org.apache.kafka.common.internals.Topic; +import org.apache.kafka.common.message.DescribeClientQuotasRequestData; +import org.apache.kafka.common.message.DescribeClientQuotasResponseData; +import org.apache.kafka.common.message.DescribeTopicPartitionsResponseData; +import org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.Cursor; +import org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponsePartition; +import org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponseTopic; +import org.apache.kafka.common.message.DescribeUserScramCredentialsRequestData; +import org.apache.kafka.common.message.DescribeUserScramCredentialsResponseData; +import org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition; +import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic; +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.image.MetadataImage; +import org.apache.kafka.image.TopicImage; +import org.apache.kafka.server.common.FinalizedFeatures; +import org.apache.kafka.server.common.KRaftVersion; +import org.apache.kafka.server.common.MetadataVersion; + +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; +import java.util.function.Predicate; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public class KRaftMetadataCache implements MetadataCache { + private final Logger log; + private final Supplier<KRaftVersion> kraftVersionSupplier; + + // This is the cache state. Every MetadataImage instance is immutable, and updates + // replace this value with a completely new one. This means reads (which are not under + // any lock) need to grab the value of this variable once, and retain that read copy for + // the duration of their operation. Multiple reads of this value risk getting different + // image values. + private volatile MetadataImage currentImage = MetadataImage.EMPTY; + + public KRaftMetadataCache(int brokerId, Supplier<KRaftVersion> kraftVersionSupplier) { + this.kraftVersionSupplier = kraftVersionSupplier; + this.log = new LogContext("[MetadataCache brokerId=" + brokerId + "] ").logger(KRaftMetadataCache.class); + } + + /** + * Filter the alive replicas. It returns all brokers when filterUnavailableEndpoints is false. + * Otherwise, it filters the brokers that are fenced or do not have the listener. + * <p> + * This method is the main hotspot when it comes to the performance of metadata requests, + * we should be careful about adding additional logic here. + * @param image The metadata image. + * @param brokers The list of brokers to filter. + * @param listenerName The listener name. + * @param filterUnavailableEndpoints Whether to filter the unavailable endpoints. This field is to support v0 MetadataResponse. + */ + private List<Integer> maybeFilterAliveReplicas( + MetadataImage image, + int[] brokers, + ListenerName listenerName, + boolean filterUnavailableEndpoints + ) { + if (!filterUnavailableEndpoints) return Replicas.toList(brokers); + List<Integer> res = new ArrayList<>(brokers.length); + for (int brokerId : brokers) { + BrokerRegistration broker = image.cluster().broker(brokerId); + if (broker != null && !broker.fenced() && broker.listeners().containsKey(listenerName.value())) { + res.add(brokerId); + } + } + return res; + } + + public MetadataImage currentImage() { + return currentImage; + } + + /** + * Get the partition metadata for the given topic and listener. If errorUnavailableEndpoints is true, + * it uses all brokers in the partitions. Otherwise, it filters the unavailable endpoints. + * If errorUnavailableListeners is true, it returns LISTENER_NOT_FOUND if the listener is missing on the broker. + * Otherwise, it returns LEADER_NOT_AVAILABLE for broker unavailable. + * + * @param image The metadata image. + * @param topicName The name of the topic. + * @param listenerName The listener name. + * @param errorUnavailableEndpoints Whether to filter the unavailable endpoints. This field is to support v0 MetadataResponse. + * @param errorUnavailableListeners Whether to return LISTENER_NOT_FOUND or LEADER_NOT_AVAILABLE. + */ + private List<MetadataResponsePartition> getPartitionMetadata( + MetadataImage image, + String topicName, + ListenerName listenerName, + boolean errorUnavailableEndpoints, + boolean errorUnavailableListeners + ) { + TopicImage topicImage = image.topics().getTopic(topicName); + if (topicImage == null) return List.of(); + return topicImage.partitions().entrySet().stream().map(entry -> { + int partitionId = entry.getKey(); + PartitionRegistration partition = entry.getValue(); + List<Integer> filteredReplicas = maybeFilterAliveReplicas(image, partition.replicas, listenerName, errorUnavailableEndpoints); + List<Integer> filteredIsr = maybeFilterAliveReplicas(image, partition.isr, listenerName, errorUnavailableEndpoints); + List<Integer> offlineReplicas = getOfflineReplicas(image, partition, listenerName); + Optional<Node> maybeLeader = getAliveEndpoint(image, partition.leader, listenerName); + Errors error; + if (maybeLeader.isEmpty()) { + if (!image.cluster().brokers().containsKey(partition.leader)) { + log.debug("Error while fetching metadata for {}-{}: leader not available", topicName, partitionId); + error = Errors.LEADER_NOT_AVAILABLE; + } else { + log.debug("Error while fetching metadata for {}-{}: listener {} not found on leader {}", topicName, partitionId, listenerName, partition.leader); + error = errorUnavailableListeners ? Errors.LISTENER_NOT_FOUND : Errors.LEADER_NOT_AVAILABLE; + } + return new MetadataResponsePartition() + .setErrorCode(error.code()) + .setPartitionIndex(partitionId) + .setLeaderId(MetadataResponse.NO_LEADER_ID) + .setLeaderEpoch(partition.leaderEpoch) + .setReplicaNodes(filteredReplicas) + .setIsrNodes(filteredIsr) + .setOfflineReplicas(offlineReplicas); + } else { + if (filteredReplicas.size() < partition.replicas.length) { + log.debug("Error while fetching metadata for {}-{}: replica information not available for following brokers {}", topicName, partitionId, Arrays.stream(partition.replicas).filter(b -> !filteredReplicas.contains(b)).mapToObj(String::valueOf).collect(Collectors.joining(","))); + error = Errors.REPLICA_NOT_AVAILABLE; + } else if (filteredIsr.size() < partition.isr.length) { + log.debug("Error while fetching metadata for {}-{}: in sync replica information not available for following brokers {}", topicName, partitionId, Arrays.stream(partition.isr).filter(b -> !filteredIsr.contains(b)).mapToObj(String::valueOf).collect(Collectors.joining(","))); + error = Errors.REPLICA_NOT_AVAILABLE; + } else { + error = Errors.NONE; + } + return new MetadataResponsePartition() + .setErrorCode(error.code()) + .setPartitionIndex(partitionId) + .setLeaderId(maybeLeader.get().id()) + .setLeaderEpoch(partition.leaderEpoch) + .setReplicaNodes(filteredReplicas) + .setIsrNodes(filteredIsr) + .setOfflineReplicas(offlineReplicas); + } + }).toList(); + } + + /** + * Return topic partition metadata for the given topic, listener and index range. Also, return the next partition + * index that is not included in the result. + * + * @param image The metadata image + * @param topicName The name of the topic. + * @param listenerName The listener name. + * @param startIndex The smallest index of the partitions to be included in the result. + * + * @return A collection of topic partition metadata and next partition index (-1 means + * no next partition). + */ + private Entry<Optional<List<DescribeTopicPartitionsResponsePartition>>, Integer> getPartitionMetadataForDescribeTopicResponse( + MetadataImage image, + String topicName, + ListenerName listenerName, + int startIndex, + int maxCount + ) { + TopicImage topic = image.topics().getTopic(topicName); + if (topic == null) return Map.entry(Optional.empty(), -1); + List<DescribeTopicPartitionsResponsePartition> result = new ArrayList<>(); + final Set<Integer> partitions = topic.partitions().keySet(); + final int upperIndex = Math.min(topic.partitions().size(), startIndex + maxCount); + for (int partitionId = startIndex; partitionId < upperIndex; partitionId++) { + PartitionRegistration partition = topic.partitions().get(partitionId); + if (partition == null) { + log.warn("The partition {} does not exist for {}", partitionId, topicName); + continue; + } + List<Integer> filteredReplicas = maybeFilterAliveReplicas(image, partition.replicas, listenerName, false); + List<Integer> filteredIsr = maybeFilterAliveReplicas(image, partition.isr, listenerName, false); + List<Integer> offlineReplicas = getOfflineReplicas(image, partition, listenerName); + Optional<Node> maybeLeader = getAliveEndpoint(image, partition.leader, listenerName); + result.add(new DescribeTopicPartitionsResponsePartition() + .setPartitionIndex(partitionId) + .setLeaderId(maybeLeader.map(Node::id).orElse(MetadataResponse.NO_LEADER_ID)) + .setLeaderEpoch(partition.leaderEpoch) + .setReplicaNodes(filteredReplicas) + .setIsrNodes(filteredIsr) + .setOfflineReplicas(offlineReplicas) + .setEligibleLeaderReplicas(Replicas.toList(partition.elr)) + .setLastKnownElr(Replicas.toList(partition.lastKnownElr))); + } + return Map.entry(Optional.of(result), (upperIndex < partitions.size()) ? upperIndex : -1); + } + + private List<Integer> getOfflineReplicas(MetadataImage image, PartitionRegistration partition, ListenerName listenerName) { + List<Integer> offlineReplicas = new ArrayList<>(0); + for (int brokerId : partition.replicas) { + BrokerRegistration broker = image.cluster().broker(brokerId); + if (broker == null || isReplicaOffline(partition, listenerName, broker)) { + offlineReplicas.add(brokerId); + } + } + return offlineReplicas; + } + + private boolean isReplicaOffline(PartitionRegistration partition, ListenerName listenerName, BrokerRegistration broker) { + return broker.fenced() || !broker.listeners().containsKey(listenerName.value()) || isReplicaInOfflineDir(broker, partition); + } + + private boolean isReplicaInOfflineDir(BrokerRegistration broker, PartitionRegistration partition) { + return !broker.hasOnlineDir(partition.directory(broker.id())); + } + + /** + * Get the endpoint matching the provided listener if the broker is alive. Note that listeners can + * be added dynamically, so a broker with a missing listener could be a transient error. + * + * @return None if broker is not alive or if the broker does not have a listener named `listenerName`. + */ + private Optional<Node> getAliveEndpoint(MetadataImage image, int id, ListenerName listenerName) { + return image.cluster().broker(id) == null ? Optional.empty() : + image.cluster().broker(id).node(listenerName.value()); + } + + @Override + public List<MetadataResponseTopic> getTopicMetadata( + Set<String> topics, + ListenerName listenerName, + boolean errorUnavailableEndpoints, + boolean errorUnavailableListeners + ) { + MetadataImage image = currentImage; + return topics.stream().flatMap(topic -> { + List<MetadataResponsePartition> partitions = getPartitionMetadata(image, topic, listenerName, errorUnavailableEndpoints, errorUnavailableListeners); + if (partitions.isEmpty()) return Stream.empty(); + return Stream.of(new MetadataResponseTopic() + .setErrorCode(Errors.NONE.code()) + .setName(topic) + .setTopicId(image.topics().getTopic(topic) == null ? Uuid.ZERO_UUID : image.topics().getTopic(topic).id()) + .setIsInternal(Topic.isInternal(topic)) + .setPartitions(partitions)); + }).toList(); + } + + @Override + public DescribeTopicPartitionsResponseData describeTopicResponse( + Iterator<String> topics, + ListenerName listenerName, + Function<String, Integer> topicPartitionStartIndex, + int maximumNumberOfPartitions, + boolean ignoreTopicsWithExceptions + ) { + MetadataImage image = currentImage; + AtomicInteger remaining = new AtomicInteger(maximumNumberOfPartitions); + DescribeTopicPartitionsResponseData result = new DescribeTopicPartitionsResponseData(); + while (topics.hasNext()) { + String topicName = topics.next(); + if (remaining.get() > 0) { + Entry<Optional<List<DescribeTopicPartitionsResponsePartition>>, Integer> partitionResponseEntry = getPartitionMetadataForDescribeTopicResponse(image, topicName, listenerName, topicPartitionStartIndex.apply(topicName), remaining.get()); + Optional<List<DescribeTopicPartitionsResponsePartition>> partitionResponse = partitionResponseEntry.getKey(); + int nextPartition = partitionResponseEntry.getValue(); + AtomicBoolean breakLoop = new AtomicBoolean(false); + partitionResponse.ifPresent(partitions -> { + DescribeTopicPartitionsResponseTopic response = new DescribeTopicPartitionsResponseTopic() + .setErrorCode(Errors.NONE.code()) + .setName(topicName) + .setTopicId(Optional.ofNullable(image.topics().getTopic(topicName).id()).orElse(Uuid.ZERO_UUID)) + .setIsInternal(Topic.isInternal(topicName)) + .setPartitions(partitions); + result.topics().add(response); + + if (nextPartition != -1) { + result.setNextCursor(new Cursor().setTopicName(topicName).setPartitionIndex(nextPartition)); + breakLoop.set(true); + } + remaining.addAndGet(-partitions.size()); Review Comment: we should not update `remaining` if `breakLoop` is true, right? ########## metadata/src/main/java/org/apache/kafka/metadata/KRaftMetadataCache.java: ########## @@ -0,0 +1,513 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.metadata; + +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.errors.InvalidTopicException; +import org.apache.kafka.common.internals.Topic; +import org.apache.kafka.common.message.DescribeClientQuotasRequestData; +import org.apache.kafka.common.message.DescribeClientQuotasResponseData; +import org.apache.kafka.common.message.DescribeTopicPartitionsResponseData; +import org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.Cursor; +import org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponsePartition; +import org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponseTopic; +import org.apache.kafka.common.message.DescribeUserScramCredentialsRequestData; +import org.apache.kafka.common.message.DescribeUserScramCredentialsResponseData; +import org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition; +import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic; +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.image.MetadataImage; +import org.apache.kafka.image.TopicImage; +import org.apache.kafka.server.common.FinalizedFeatures; +import org.apache.kafka.server.common.KRaftVersion; +import org.apache.kafka.server.common.MetadataVersion; + +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; +import java.util.function.Predicate; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public class KRaftMetadataCache implements MetadataCache { + private final Logger log; + private final Supplier<KRaftVersion> kraftVersionSupplier; + + // This is the cache state. Every MetadataImage instance is immutable, and updates + // replace this value with a completely new one. This means reads (which are not under + // any lock) need to grab the value of this variable once, and retain that read copy for + // the duration of their operation. Multiple reads of this value risk getting different + // image values. + private volatile MetadataImage currentImage = MetadataImage.EMPTY; + + public KRaftMetadataCache(int brokerId, Supplier<KRaftVersion> kraftVersionSupplier) { + this.kraftVersionSupplier = kraftVersionSupplier; + this.log = new LogContext("[MetadataCache brokerId=" + brokerId + "] ").logger(KRaftMetadataCache.class); + } + + /** + * Filter the alive replicas. It returns all brokers when filterUnavailableEndpoints is false. + * Otherwise, it filters the brokers that are fenced or do not have the listener. + * <p> + * This method is the main hotspot when it comes to the performance of metadata requests, + * we should be careful about adding additional logic here. + * @param image The metadata image. + * @param brokers The list of brokers to filter. + * @param listenerName The listener name. + * @param filterUnavailableEndpoints Whether to filter the unavailable endpoints. This field is to support v0 MetadataResponse. + */ + private List<Integer> maybeFilterAliveReplicas( + MetadataImage image, + int[] brokers, + ListenerName listenerName, + boolean filterUnavailableEndpoints + ) { + if (!filterUnavailableEndpoints) return Replicas.toList(brokers); + List<Integer> res = new ArrayList<>(brokers.length); + for (int brokerId : brokers) { + BrokerRegistration broker = image.cluster().broker(brokerId); + if (broker != null && !broker.fenced() && broker.listeners().containsKey(listenerName.value())) { + res.add(brokerId); + } + } + return res; + } + + public MetadataImage currentImage() { + return currentImage; + } + + /** + * Get the partition metadata for the given topic and listener. If errorUnavailableEndpoints is true, + * it uses all brokers in the partitions. Otherwise, it filters the unavailable endpoints. + * If errorUnavailableListeners is true, it returns LISTENER_NOT_FOUND if the listener is missing on the broker. + * Otherwise, it returns LEADER_NOT_AVAILABLE for broker unavailable. + * + * @param image The metadata image. + * @param topicName The name of the topic. + * @param listenerName The listener name. + * @param errorUnavailableEndpoints Whether to filter the unavailable endpoints. This field is to support v0 MetadataResponse. + * @param errorUnavailableListeners Whether to return LISTENER_NOT_FOUND or LEADER_NOT_AVAILABLE. + */ + private List<MetadataResponsePartition> getPartitionMetadata( + MetadataImage image, + String topicName, + ListenerName listenerName, + boolean errorUnavailableEndpoints, + boolean errorUnavailableListeners + ) { + TopicImage topicImage = image.topics().getTopic(topicName); + if (topicImage == null) return List.of(); + return topicImage.partitions().entrySet().stream().map(entry -> { + int partitionId = entry.getKey(); + PartitionRegistration partition = entry.getValue(); + List<Integer> filteredReplicas = maybeFilterAliveReplicas(image, partition.replicas, listenerName, errorUnavailableEndpoints); + List<Integer> filteredIsr = maybeFilterAliveReplicas(image, partition.isr, listenerName, errorUnavailableEndpoints); + List<Integer> offlineReplicas = getOfflineReplicas(image, partition, listenerName); + Optional<Node> maybeLeader = getAliveEndpoint(image, partition.leader, listenerName); + Errors error; + if (maybeLeader.isEmpty()) { + if (!image.cluster().brokers().containsKey(partition.leader)) { + log.debug("Error while fetching metadata for {}-{}: leader not available", topicName, partitionId); + error = Errors.LEADER_NOT_AVAILABLE; + } else { + log.debug("Error while fetching metadata for {}-{}: listener {} not found on leader {}", topicName, partitionId, listenerName, partition.leader); + error = errorUnavailableListeners ? Errors.LISTENER_NOT_FOUND : Errors.LEADER_NOT_AVAILABLE; + } + return new MetadataResponsePartition() + .setErrorCode(error.code()) + .setPartitionIndex(partitionId) + .setLeaderId(MetadataResponse.NO_LEADER_ID) + .setLeaderEpoch(partition.leaderEpoch) + .setReplicaNodes(filteredReplicas) + .setIsrNodes(filteredIsr) + .setOfflineReplicas(offlineReplicas); + } else { + if (filteredReplicas.size() < partition.replicas.length) { + log.debug("Error while fetching metadata for {}-{}: replica information not available for following brokers {}", topicName, partitionId, Arrays.stream(partition.replicas).filter(b -> !filteredReplicas.contains(b)).mapToObj(String::valueOf).collect(Collectors.joining(","))); + error = Errors.REPLICA_NOT_AVAILABLE; + } else if (filteredIsr.size() < partition.isr.length) { + log.debug("Error while fetching metadata for {}-{}: in sync replica information not available for following brokers {}", topicName, partitionId, Arrays.stream(partition.isr).filter(b -> !filteredIsr.contains(b)).mapToObj(String::valueOf).collect(Collectors.joining(","))); + error = Errors.REPLICA_NOT_AVAILABLE; + } else { + error = Errors.NONE; + } + return new MetadataResponsePartition() + .setErrorCode(error.code()) + .setPartitionIndex(partitionId) + .setLeaderId(maybeLeader.get().id()) + .setLeaderEpoch(partition.leaderEpoch) + .setReplicaNodes(filteredReplicas) + .setIsrNodes(filteredIsr) + .setOfflineReplicas(offlineReplicas); + } + }).toList(); + } + + /** + * Return topic partition metadata for the given topic, listener and index range. Also, return the next partition + * index that is not included in the result. + * + * @param image The metadata image + * @param topicName The name of the topic. + * @param listenerName The listener name. + * @param startIndex The smallest index of the partitions to be included in the result. + * + * @return A collection of topic partition metadata and next partition index (-1 means + * no next partition). + */ + private Entry<Optional<List<DescribeTopicPartitionsResponsePartition>>, Integer> getPartitionMetadataForDescribeTopicResponse( + MetadataImage image, + String topicName, + ListenerName listenerName, + int startIndex, + int maxCount + ) { + TopicImage topic = image.topics().getTopic(topicName); + if (topic == null) return Map.entry(Optional.empty(), -1); + List<DescribeTopicPartitionsResponsePartition> result = new ArrayList<>(); + final Set<Integer> partitions = topic.partitions().keySet(); + final int upperIndex = Math.min(topic.partitions().size(), startIndex + maxCount); + for (int partitionId = startIndex; partitionId < upperIndex; partitionId++) { + PartitionRegistration partition = topic.partitions().get(partitionId); + if (partition == null) { + log.warn("The partition {} does not exist for {}", partitionId, topicName); + continue; + } + List<Integer> filteredReplicas = maybeFilterAliveReplicas(image, partition.replicas, listenerName, false); + List<Integer> filteredIsr = maybeFilterAliveReplicas(image, partition.isr, listenerName, false); + List<Integer> offlineReplicas = getOfflineReplicas(image, partition, listenerName); + Optional<Node> maybeLeader = getAliveEndpoint(image, partition.leader, listenerName); + result.add(new DescribeTopicPartitionsResponsePartition() + .setPartitionIndex(partitionId) + .setLeaderId(maybeLeader.map(Node::id).orElse(MetadataResponse.NO_LEADER_ID)) + .setLeaderEpoch(partition.leaderEpoch) + .setReplicaNodes(filteredReplicas) + .setIsrNodes(filteredIsr) + .setOfflineReplicas(offlineReplicas) + .setEligibleLeaderReplicas(Replicas.toList(partition.elr)) + .setLastKnownElr(Replicas.toList(partition.lastKnownElr))); + } + return Map.entry(Optional.of(result), (upperIndex < partitions.size()) ? upperIndex : -1); + } + + private List<Integer> getOfflineReplicas(MetadataImage image, PartitionRegistration partition, ListenerName listenerName) { + List<Integer> offlineReplicas = new ArrayList<>(0); + for (int brokerId : partition.replicas) { + BrokerRegistration broker = image.cluster().broker(brokerId); + if (broker == null || isReplicaOffline(partition, listenerName, broker)) { + offlineReplicas.add(brokerId); + } + } + return offlineReplicas; + } + + private boolean isReplicaOffline(PartitionRegistration partition, ListenerName listenerName, BrokerRegistration broker) { + return broker.fenced() || !broker.listeners().containsKey(listenerName.value()) || isReplicaInOfflineDir(broker, partition); + } + + private boolean isReplicaInOfflineDir(BrokerRegistration broker, PartitionRegistration partition) { + return !broker.hasOnlineDir(partition.directory(broker.id())); + } + + /** + * Get the endpoint matching the provided listener if the broker is alive. Note that listeners can + * be added dynamically, so a broker with a missing listener could be a transient error. + * + * @return None if broker is not alive or if the broker does not have a listener named `listenerName`. + */ + private Optional<Node> getAliveEndpoint(MetadataImage image, int id, ListenerName listenerName) { + return image.cluster().broker(id) == null ? Optional.empty() : + image.cluster().broker(id).node(listenerName.value()); + } + + @Override + public List<MetadataResponseTopic> getTopicMetadata( + Set<String> topics, + ListenerName listenerName, + boolean errorUnavailableEndpoints, + boolean errorUnavailableListeners + ) { + MetadataImage image = currentImage; + return topics.stream().flatMap(topic -> { + List<MetadataResponsePartition> partitions = getPartitionMetadata(image, topic, listenerName, errorUnavailableEndpoints, errorUnavailableListeners); Review Comment: `getPartitionMetadata` -> `partitionMetadata` ########## metadata/src/main/java/org/apache/kafka/metadata/KRaftMetadataCache.java: ########## @@ -0,0 +1,513 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.metadata; + +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.errors.InvalidTopicException; +import org.apache.kafka.common.internals.Topic; +import org.apache.kafka.common.message.DescribeClientQuotasRequestData; +import org.apache.kafka.common.message.DescribeClientQuotasResponseData; +import org.apache.kafka.common.message.DescribeTopicPartitionsResponseData; +import org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.Cursor; +import org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponsePartition; +import org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponseTopic; +import org.apache.kafka.common.message.DescribeUserScramCredentialsRequestData; +import org.apache.kafka.common.message.DescribeUserScramCredentialsResponseData; +import org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition; +import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic; +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.image.MetadataImage; +import org.apache.kafka.image.TopicImage; +import org.apache.kafka.server.common.FinalizedFeatures; +import org.apache.kafka.server.common.KRaftVersion; +import org.apache.kafka.server.common.MetadataVersion; + +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; +import java.util.function.Predicate; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public class KRaftMetadataCache implements MetadataCache { + private final Logger log; + private final Supplier<KRaftVersion> kraftVersionSupplier; + + // This is the cache state. Every MetadataImage instance is immutable, and updates + // replace this value with a completely new one. This means reads (which are not under + // any lock) need to grab the value of this variable once, and retain that read copy for + // the duration of their operation. Multiple reads of this value risk getting different + // image values. + private volatile MetadataImage currentImage = MetadataImage.EMPTY; + + public KRaftMetadataCache(int brokerId, Supplier<KRaftVersion> kraftVersionSupplier) { + this.kraftVersionSupplier = kraftVersionSupplier; + this.log = new LogContext("[MetadataCache brokerId=" + brokerId + "] ").logger(KRaftMetadataCache.class); + } + + /** + * Filter the alive replicas. It returns all brokers when filterUnavailableEndpoints is false. + * Otherwise, it filters the brokers that are fenced or do not have the listener. + * <p> + * This method is the main hotspot when it comes to the performance of metadata requests, + * we should be careful about adding additional logic here. + * @param image The metadata image. + * @param brokers The list of brokers to filter. + * @param listenerName The listener name. + * @param filterUnavailableEndpoints Whether to filter the unavailable endpoints. This field is to support v0 MetadataResponse. + */ + private List<Integer> maybeFilterAliveReplicas( + MetadataImage image, + int[] brokers, + ListenerName listenerName, + boolean filterUnavailableEndpoints + ) { + if (!filterUnavailableEndpoints) return Replicas.toList(brokers); + List<Integer> res = new ArrayList<>(brokers.length); + for (int brokerId : brokers) { + BrokerRegistration broker = image.cluster().broker(brokerId); + if (broker != null && !broker.fenced() && broker.listeners().containsKey(listenerName.value())) { + res.add(brokerId); + } + } + return res; + } + + public MetadataImage currentImage() { + return currentImage; + } + + /** + * Get the partition metadata for the given topic and listener. If errorUnavailableEndpoints is true, + * it uses all brokers in the partitions. Otherwise, it filters the unavailable endpoints. + * If errorUnavailableListeners is true, it returns LISTENER_NOT_FOUND if the listener is missing on the broker. + * Otherwise, it returns LEADER_NOT_AVAILABLE for broker unavailable. + * + * @param image The metadata image. + * @param topicName The name of the topic. + * @param listenerName The listener name. + * @param errorUnavailableEndpoints Whether to filter the unavailable endpoints. This field is to support v0 MetadataResponse. + * @param errorUnavailableListeners Whether to return LISTENER_NOT_FOUND or LEADER_NOT_AVAILABLE. + */ + private List<MetadataResponsePartition> getPartitionMetadata( + MetadataImage image, + String topicName, + ListenerName listenerName, + boolean errorUnavailableEndpoints, + boolean errorUnavailableListeners + ) { + TopicImage topicImage = image.topics().getTopic(topicName); + if (topicImage == null) return List.of(); + return topicImage.partitions().entrySet().stream().map(entry -> { + int partitionId = entry.getKey(); + PartitionRegistration partition = entry.getValue(); + List<Integer> filteredReplicas = maybeFilterAliveReplicas(image, partition.replicas, listenerName, errorUnavailableEndpoints); + List<Integer> filteredIsr = maybeFilterAliveReplicas(image, partition.isr, listenerName, errorUnavailableEndpoints); + List<Integer> offlineReplicas = getOfflineReplicas(image, partition, listenerName); + Optional<Node> maybeLeader = getAliveEndpoint(image, partition.leader, listenerName); + Errors error; + if (maybeLeader.isEmpty()) { + if (!image.cluster().brokers().containsKey(partition.leader)) { + log.debug("Error while fetching metadata for {}-{}: leader not available", topicName, partitionId); + error = Errors.LEADER_NOT_AVAILABLE; + } else { + log.debug("Error while fetching metadata for {}-{}: listener {} not found on leader {}", topicName, partitionId, listenerName, partition.leader); + error = errorUnavailableListeners ? Errors.LISTENER_NOT_FOUND : Errors.LEADER_NOT_AVAILABLE; + } + return new MetadataResponsePartition() + .setErrorCode(error.code()) + .setPartitionIndex(partitionId) + .setLeaderId(MetadataResponse.NO_LEADER_ID) + .setLeaderEpoch(partition.leaderEpoch) + .setReplicaNodes(filteredReplicas) + .setIsrNodes(filteredIsr) + .setOfflineReplicas(offlineReplicas); + } else { + if (filteredReplicas.size() < partition.replicas.length) { + log.debug("Error while fetching metadata for {}-{}: replica information not available for following brokers {}", topicName, partitionId, Arrays.stream(partition.replicas).filter(b -> !filteredReplicas.contains(b)).mapToObj(String::valueOf).collect(Collectors.joining(","))); + error = Errors.REPLICA_NOT_AVAILABLE; + } else if (filteredIsr.size() < partition.isr.length) { + log.debug("Error while fetching metadata for {}-{}: in sync replica information not available for following brokers {}", topicName, partitionId, Arrays.stream(partition.isr).filter(b -> !filteredIsr.contains(b)).mapToObj(String::valueOf).collect(Collectors.joining(","))); + error = Errors.REPLICA_NOT_AVAILABLE; + } else { + error = Errors.NONE; + } + return new MetadataResponsePartition() + .setErrorCode(error.code()) + .setPartitionIndex(partitionId) + .setLeaderId(maybeLeader.get().id()) + .setLeaderEpoch(partition.leaderEpoch) + .setReplicaNodes(filteredReplicas) + .setIsrNodes(filteredIsr) + .setOfflineReplicas(offlineReplicas); + } + }).toList(); + } + + /** + * Return topic partition metadata for the given topic, listener and index range. Also, return the next partition + * index that is not included in the result. + * + * @param image The metadata image + * @param topicName The name of the topic. + * @param listenerName The listener name. + * @param startIndex The smallest index of the partitions to be included in the result. + * + * @return A collection of topic partition metadata and next partition index (-1 means + * no next partition). + */ + private Entry<Optional<List<DescribeTopicPartitionsResponsePartition>>, Integer> getPartitionMetadataForDescribeTopicResponse( + MetadataImage image, + String topicName, + ListenerName listenerName, + int startIndex, + int maxCount + ) { + TopicImage topic = image.topics().getTopic(topicName); + if (topic == null) return Map.entry(Optional.empty(), -1); + List<DescribeTopicPartitionsResponsePartition> result = new ArrayList<>(); + final Set<Integer> partitions = topic.partitions().keySet(); + final int upperIndex = Math.min(topic.partitions().size(), startIndex + maxCount); + for (int partitionId = startIndex; partitionId < upperIndex; partitionId++) { + PartitionRegistration partition = topic.partitions().get(partitionId); + if (partition == null) { + log.warn("The partition {} does not exist for {}", partitionId, topicName); + continue; + } + List<Integer> filteredReplicas = maybeFilterAliveReplicas(image, partition.replicas, listenerName, false); + List<Integer> filteredIsr = maybeFilterAliveReplicas(image, partition.isr, listenerName, false); + List<Integer> offlineReplicas = getOfflineReplicas(image, partition, listenerName); + Optional<Node> maybeLeader = getAliveEndpoint(image, partition.leader, listenerName); + result.add(new DescribeTopicPartitionsResponsePartition() + .setPartitionIndex(partitionId) + .setLeaderId(maybeLeader.map(Node::id).orElse(MetadataResponse.NO_LEADER_ID)) + .setLeaderEpoch(partition.leaderEpoch) + .setReplicaNodes(filteredReplicas) + .setIsrNodes(filteredIsr) + .setOfflineReplicas(offlineReplicas) + .setEligibleLeaderReplicas(Replicas.toList(partition.elr)) + .setLastKnownElr(Replicas.toList(partition.lastKnownElr))); + } + return Map.entry(Optional.of(result), (upperIndex < partitions.size()) ? upperIndex : -1); + } + + private List<Integer> getOfflineReplicas(MetadataImage image, PartitionRegistration partition, ListenerName listenerName) { + List<Integer> offlineReplicas = new ArrayList<>(0); + for (int brokerId : partition.replicas) { + BrokerRegistration broker = image.cluster().broker(brokerId); + if (broker == null || isReplicaOffline(partition, listenerName, broker)) { + offlineReplicas.add(brokerId); + } + } + return offlineReplicas; + } + + private boolean isReplicaOffline(PartitionRegistration partition, ListenerName listenerName, BrokerRegistration broker) { + return broker.fenced() || !broker.listeners().containsKey(listenerName.value()) || isReplicaInOfflineDir(broker, partition); + } + + private boolean isReplicaInOfflineDir(BrokerRegistration broker, PartitionRegistration partition) { + return !broker.hasOnlineDir(partition.directory(broker.id())); + } + + /** + * Get the endpoint matching the provided listener if the broker is alive. Note that listeners can + * be added dynamically, so a broker with a missing listener could be a transient error. + * + * @return None if broker is not alive or if the broker does not have a listener named `listenerName`. + */ + private Optional<Node> getAliveEndpoint(MetadataImage image, int id, ListenerName listenerName) { + return image.cluster().broker(id) == null ? Optional.empty() : + image.cluster().broker(id).node(listenerName.value()); + } + + @Override + public List<MetadataResponseTopic> getTopicMetadata( + Set<String> topics, + ListenerName listenerName, + boolean errorUnavailableEndpoints, + boolean errorUnavailableListeners + ) { + MetadataImage image = currentImage; + return topics.stream().flatMap(topic -> { + List<MetadataResponsePartition> partitions = getPartitionMetadata(image, topic, listenerName, errorUnavailableEndpoints, errorUnavailableListeners); + if (partitions.isEmpty()) return Stream.empty(); + return Stream.of(new MetadataResponseTopic() + .setErrorCode(Errors.NONE.code()) + .setName(topic) + .setTopicId(image.topics().getTopic(topic) == null ? Uuid.ZERO_UUID : image.topics().getTopic(topic).id()) + .setIsInternal(Topic.isInternal(topic)) + .setPartitions(partitions)); + }).toList(); + } + + @Override + public DescribeTopicPartitionsResponseData describeTopicResponse( + Iterator<String> topics, + ListenerName listenerName, + Function<String, Integer> topicPartitionStartIndex, + int maximumNumberOfPartitions, + boolean ignoreTopicsWithExceptions + ) { + MetadataImage image = currentImage; + AtomicInteger remaining = new AtomicInteger(maximumNumberOfPartitions); + DescribeTopicPartitionsResponseData result = new DescribeTopicPartitionsResponseData(); + while (topics.hasNext()) { + String topicName = topics.next(); + if (remaining.get() > 0) { + Entry<Optional<List<DescribeTopicPartitionsResponsePartition>>, Integer> partitionResponseEntry = getPartitionMetadataForDescribeTopicResponse(image, topicName, listenerName, topicPartitionStartIndex.apply(topicName), remaining.get()); + Optional<List<DescribeTopicPartitionsResponsePartition>> partitionResponse = partitionResponseEntry.getKey(); + int nextPartition = partitionResponseEntry.getValue(); + AtomicBoolean breakLoop = new AtomicBoolean(false); + partitionResponse.ifPresent(partitions -> { Review Comment: this lambda function causes many weird code style. maybe we can use `isPresent` and `get` to refactor it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org