chia7712 commented on code in PR #19232:
URL: https://github.com/apache/kafka/pull/19232#discussion_r2095357396


##########
metadata/src/main/java/org/apache/kafka/metadata/KRaftMetadataCache.java:
##########
@@ -0,0 +1,513 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.metadata;
+
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.message.DescribeClientQuotasRequestData;
+import org.apache.kafka.common.message.DescribeClientQuotasResponseData;
+import org.apache.kafka.common.message.DescribeTopicPartitionsResponseData;
+import 
org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.Cursor;
+import 
org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponsePartition;
+import 
org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponseTopic;
+import org.apache.kafka.common.message.DescribeUserScramCredentialsRequestData;
+import 
org.apache.kafka.common.message.DescribeUserScramCredentialsResponseData;
+import 
org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition;
+import 
org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.TopicImage;
+import org.apache.kafka.server.common.FinalizedFeatures;
+import org.apache.kafka.server.common.KRaftVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class KRaftMetadataCache implements MetadataCache {
+    private final Logger log;
+    private final Supplier<KRaftVersion> kraftVersionSupplier;
+
+    // This is the cache state. Every MetadataImage instance is immutable, and 
updates
+    // replace this value with a completely new one. This means reads (which 
are not under
+    // any lock) need to grab the value of this variable once, and retain that 
read copy for
+    // the duration of their operation. Multiple reads of this value risk 
getting different
+    // image values.
+    private volatile MetadataImage currentImage = MetadataImage.EMPTY;
+
+    public KRaftMetadataCache(int brokerId, Supplier<KRaftVersion> 
kraftVersionSupplier) {
+        this.kraftVersionSupplier = kraftVersionSupplier;
+        this.log = new LogContext("[MetadataCache brokerId=" + brokerId + "] 
").logger(KRaftMetadataCache.class);
+    }
+
+    /**
+     * Filter the alive replicas. It returns all brokers when 
filterUnavailableEndpoints is false.
+     * Otherwise, it filters the brokers that are fenced or do not have the 
listener.
+     * <p>
+     * This method is the main hotspot when it comes to the performance of 
metadata requests,
+     * we should be careful about adding additional logic here.
+     * @param image                      The metadata image.
+     * @param brokers                    The list of brokers to filter.
+     * @param listenerName               The listener name.
+     * @param filterUnavailableEndpoints Whether to filter the unavailable 
endpoints. This field is to support v0 MetadataResponse.
+     */
+    private List<Integer> maybeFilterAliveReplicas(
+        MetadataImage image,
+        int[] brokers,
+        ListenerName listenerName,
+        boolean filterUnavailableEndpoints
+    ) {
+        if (!filterUnavailableEndpoints) return Replicas.toList(brokers);
+        List<Integer> res = new ArrayList<>(brokers.length);
+        for (int brokerId : brokers) {
+            BrokerRegistration broker = image.cluster().broker(brokerId);
+            if (broker != null && !broker.fenced() && 
broker.listeners().containsKey(listenerName.value())) {
+                res.add(brokerId);
+            }
+        }
+        return res;
+    }
+
+    public MetadataImage currentImage() {
+        return currentImage;
+    }
+
+    /**
+     * Get the partition metadata for the given topic and listener. If 
errorUnavailableEndpoints is true,
+     * it uses all brokers in the partitions. Otherwise, it filters the 
unavailable endpoints.
+     * If errorUnavailableListeners is true, it returns LISTENER_NOT_FOUND if 
the listener is missing on the broker.
+     * Otherwise, it returns LEADER_NOT_AVAILABLE for broker unavailable.
+     *
+     * @param image                     The metadata image.
+     * @param topicName                 The name of the topic.
+     * @param listenerName              The listener name.
+     * @param errorUnavailableEndpoints Whether to filter the unavailable 
endpoints. This field is to support v0 MetadataResponse.
+     * @param errorUnavailableListeners Whether to return LISTENER_NOT_FOUND 
or LEADER_NOT_AVAILABLE.
+     */
+    private List<MetadataResponsePartition> getPartitionMetadata(
+        MetadataImage image,
+        String topicName,
+        ListenerName listenerName,
+        boolean errorUnavailableEndpoints,
+        boolean errorUnavailableListeners
+    ) {
+        TopicImage topicImage = image.topics().getTopic(topicName);
+        if (topicImage == null) return List.of();
+        return topicImage.partitions().entrySet().stream().map(entry -> {
+            int partitionId = entry.getKey();
+            PartitionRegistration partition = entry.getValue();
+            List<Integer> filteredReplicas = maybeFilterAliveReplicas(image, 
partition.replicas, listenerName, errorUnavailableEndpoints);
+            List<Integer> filteredIsr = maybeFilterAliveReplicas(image, 
partition.isr, listenerName, errorUnavailableEndpoints);
+            List<Integer> offlineReplicas = getOfflineReplicas(image, 
partition, listenerName);
+            Optional<Node> maybeLeader = getAliveEndpoint(image, 
partition.leader, listenerName);
+            Errors error;
+            if (maybeLeader.isEmpty()) {
+                if (!image.cluster().brokers().containsKey(partition.leader)) {
+                    log.debug("Error while fetching metadata for {}-{}: leader 
not available", topicName, partitionId);
+                    error = Errors.LEADER_NOT_AVAILABLE;
+                } else {
+                    log.debug("Error while fetching metadata for {}-{}: 
listener {} not found on leader {}", topicName, partitionId, listenerName, 
partition.leader);
+                    error = errorUnavailableListeners ? 
Errors.LISTENER_NOT_FOUND : Errors.LEADER_NOT_AVAILABLE;
+                }
+                return new MetadataResponsePartition()
+                    .setErrorCode(error.code())
+                    .setPartitionIndex(partitionId)
+                    .setLeaderId(MetadataResponse.NO_LEADER_ID)
+                    .setLeaderEpoch(partition.leaderEpoch)
+                    .setReplicaNodes(filteredReplicas)
+                    .setIsrNodes(filteredIsr)
+                    .setOfflineReplicas(offlineReplicas);
+            } else {
+                if (filteredReplicas.size() < partition.replicas.length) {
+                    log.debug("Error while fetching metadata for {}-{}: 
replica information not available for following brokers {}", topicName, 
partitionId, Arrays.stream(partition.replicas).filter(b -> 
!filteredReplicas.contains(b)).mapToObj(String::valueOf).collect(Collectors.joining(",")));
+                    error = Errors.REPLICA_NOT_AVAILABLE;
+                } else if (filteredIsr.size() < partition.isr.length) {
+                    log.debug("Error while fetching metadata for {}-{}: in 
sync replica information not available for following brokers {}", topicName, 
partitionId, Arrays.stream(partition.isr).filter(b -> 
!filteredIsr.contains(b)).mapToObj(String::valueOf).collect(Collectors.joining(",")));
+                    error = Errors.REPLICA_NOT_AVAILABLE;
+                } else {
+                    error = Errors.NONE;
+                }
+                return new MetadataResponsePartition()
+                    .setErrorCode(error.code())
+                    .setPartitionIndex(partitionId)
+                    .setLeaderId(maybeLeader.get().id())
+                    .setLeaderEpoch(partition.leaderEpoch)
+                    .setReplicaNodes(filteredReplicas)
+                    .setIsrNodes(filteredIsr)
+                    .setOfflineReplicas(offlineReplicas);
+            }
+        }).toList();
+    }
+
+    /**
+     * Return topic partition metadata for the given topic, listener and index 
range. Also, return the next partition
+     * index that is not included in the result.
+     *
+     * @param image                       The metadata image
+     * @param topicName                   The name of the topic.
+     * @param listenerName                The listener name.
+     * @param startIndex                  The smallest index of the partitions 
to be included in the result.
+     *
+     * @return                            A collection of topic partition 
metadata and next partition index (-1 means
+     *                                    no next partition).
+     */
+    private Entry<Optional<List<DescribeTopicPartitionsResponsePartition>>, 
Integer> getPartitionMetadataForDescribeTopicResponse(

Review Comment:
   `getPartitionMetadataForDescribeTopicResponse` -> 
`partitionMetadataForDescribeTopicResponse`



##########
metadata/src/main/java/org/apache/kafka/metadata/KRaftMetadataCache.java:
##########
@@ -0,0 +1,513 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.metadata;
+
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.message.DescribeClientQuotasRequestData;
+import org.apache.kafka.common.message.DescribeClientQuotasResponseData;
+import org.apache.kafka.common.message.DescribeTopicPartitionsResponseData;
+import 
org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.Cursor;
+import 
org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponsePartition;
+import 
org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponseTopic;
+import org.apache.kafka.common.message.DescribeUserScramCredentialsRequestData;
+import 
org.apache.kafka.common.message.DescribeUserScramCredentialsResponseData;
+import 
org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition;
+import 
org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.TopicImage;
+import org.apache.kafka.server.common.FinalizedFeatures;
+import org.apache.kafka.server.common.KRaftVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class KRaftMetadataCache implements MetadataCache {
+    private final Logger log;
+    private final Supplier<KRaftVersion> kraftVersionSupplier;
+
+    // This is the cache state. Every MetadataImage instance is immutable, and 
updates
+    // replace this value with a completely new one. This means reads (which 
are not under
+    // any lock) need to grab the value of this variable once, and retain that 
read copy for
+    // the duration of their operation. Multiple reads of this value risk 
getting different
+    // image values.
+    private volatile MetadataImage currentImage = MetadataImage.EMPTY;
+
+    public KRaftMetadataCache(int brokerId, Supplier<KRaftVersion> 
kraftVersionSupplier) {
+        this.kraftVersionSupplier = kraftVersionSupplier;
+        this.log = new LogContext("[MetadataCache brokerId=" + brokerId + "] 
").logger(KRaftMetadataCache.class);
+    }
+
+    /**
+     * Filter the alive replicas. It returns all brokers when 
filterUnavailableEndpoints is false.
+     * Otherwise, it filters the brokers that are fenced or do not have the 
listener.
+     * <p>
+     * This method is the main hotspot when it comes to the performance of 
metadata requests,
+     * we should be careful about adding additional logic here.
+     * @param image                      The metadata image.
+     * @param brokers                    The list of brokers to filter.
+     * @param listenerName               The listener name.
+     * @param filterUnavailableEndpoints Whether to filter the unavailable 
endpoints. This field is to support v0 MetadataResponse.
+     */
+    private List<Integer> maybeFilterAliveReplicas(
+        MetadataImage image,
+        int[] brokers,
+        ListenerName listenerName,
+        boolean filterUnavailableEndpoints
+    ) {
+        if (!filterUnavailableEndpoints) return Replicas.toList(brokers);
+        List<Integer> res = new ArrayList<>(brokers.length);
+        for (int brokerId : brokers) {
+            BrokerRegistration broker = image.cluster().broker(brokerId);
+            if (broker != null && !broker.fenced() && 
broker.listeners().containsKey(listenerName.value())) {
+                res.add(brokerId);
+            }
+        }
+        return res;
+    }
+
+    public MetadataImage currentImage() {
+        return currentImage;
+    }
+
+    /**
+     * Get the partition metadata for the given topic and listener. If 
errorUnavailableEndpoints is true,
+     * it uses all brokers in the partitions. Otherwise, it filters the 
unavailable endpoints.
+     * If errorUnavailableListeners is true, it returns LISTENER_NOT_FOUND if 
the listener is missing on the broker.
+     * Otherwise, it returns LEADER_NOT_AVAILABLE for broker unavailable.
+     *
+     * @param image                     The metadata image.
+     * @param topicName                 The name of the topic.
+     * @param listenerName              The listener name.
+     * @param errorUnavailableEndpoints Whether to filter the unavailable 
endpoints. This field is to support v0 MetadataResponse.
+     * @param errorUnavailableListeners Whether to return LISTENER_NOT_FOUND 
or LEADER_NOT_AVAILABLE.
+     */
+    private List<MetadataResponsePartition> getPartitionMetadata(
+        MetadataImage image,
+        String topicName,
+        ListenerName listenerName,
+        boolean errorUnavailableEndpoints,
+        boolean errorUnavailableListeners
+    ) {
+        TopicImage topicImage = image.topics().getTopic(topicName);
+        if (topicImage == null) return List.of();
+        return topicImage.partitions().entrySet().stream().map(entry -> {
+            int partitionId = entry.getKey();
+            PartitionRegistration partition = entry.getValue();
+            List<Integer> filteredReplicas = maybeFilterAliveReplicas(image, 
partition.replicas, listenerName, errorUnavailableEndpoints);
+            List<Integer> filteredIsr = maybeFilterAliveReplicas(image, 
partition.isr, listenerName, errorUnavailableEndpoints);
+            List<Integer> offlineReplicas = getOfflineReplicas(image, 
partition, listenerName);
+            Optional<Node> maybeLeader = getAliveEndpoint(image, 
partition.leader, listenerName);
+            Errors error;
+            if (maybeLeader.isEmpty()) {
+                if (!image.cluster().brokers().containsKey(partition.leader)) {
+                    log.debug("Error while fetching metadata for {}-{}: leader 
not available", topicName, partitionId);
+                    error = Errors.LEADER_NOT_AVAILABLE;
+                } else {
+                    log.debug("Error while fetching metadata for {}-{}: 
listener {} not found on leader {}", topicName, partitionId, listenerName, 
partition.leader);
+                    error = errorUnavailableListeners ? 
Errors.LISTENER_NOT_FOUND : Errors.LEADER_NOT_AVAILABLE;
+                }
+                return new MetadataResponsePartition()
+                    .setErrorCode(error.code())
+                    .setPartitionIndex(partitionId)
+                    .setLeaderId(MetadataResponse.NO_LEADER_ID)
+                    .setLeaderEpoch(partition.leaderEpoch)
+                    .setReplicaNodes(filteredReplicas)
+                    .setIsrNodes(filteredIsr)
+                    .setOfflineReplicas(offlineReplicas);
+            } else {
+                if (filteredReplicas.size() < partition.replicas.length) {
+                    log.debug("Error while fetching metadata for {}-{}: 
replica information not available for following brokers {}", topicName, 
partitionId, Arrays.stream(partition.replicas).filter(b -> 
!filteredReplicas.contains(b)).mapToObj(String::valueOf).collect(Collectors.joining(",")));
+                    error = Errors.REPLICA_NOT_AVAILABLE;
+                } else if (filteredIsr.size() < partition.isr.length) {
+                    log.debug("Error while fetching metadata for {}-{}: in 
sync replica information not available for following brokers {}", topicName, 
partitionId, Arrays.stream(partition.isr).filter(b -> 
!filteredIsr.contains(b)).mapToObj(String::valueOf).collect(Collectors.joining(",")));
+                    error = Errors.REPLICA_NOT_AVAILABLE;
+                } else {
+                    error = Errors.NONE;
+                }
+                return new MetadataResponsePartition()
+                    .setErrorCode(error.code())
+                    .setPartitionIndex(partitionId)
+                    .setLeaderId(maybeLeader.get().id())
+                    .setLeaderEpoch(partition.leaderEpoch)
+                    .setReplicaNodes(filteredReplicas)
+                    .setIsrNodes(filteredIsr)
+                    .setOfflineReplicas(offlineReplicas);
+            }
+        }).toList();
+    }
+
+    /**
+     * Return topic partition metadata for the given topic, listener and index 
range. Also, return the next partition
+     * index that is not included in the result.
+     *
+     * @param image                       The metadata image
+     * @param topicName                   The name of the topic.
+     * @param listenerName                The listener name.
+     * @param startIndex                  The smallest index of the partitions 
to be included in the result.
+     *
+     * @return                            A collection of topic partition 
metadata and next partition index (-1 means
+     *                                    no next partition).
+     */
+    private Entry<Optional<List<DescribeTopicPartitionsResponsePartition>>, 
Integer> getPartitionMetadataForDescribeTopicResponse(
+        MetadataImage image,
+        String topicName,
+        ListenerName listenerName,
+        int startIndex,
+        int maxCount
+    ) {
+        TopicImage topic = image.topics().getTopic(topicName);
+        if (topic == null) return Map.entry(Optional.empty(), -1);
+        List<DescribeTopicPartitionsResponsePartition> result = new 
ArrayList<>();
+        final Set<Integer> partitions = topic.partitions().keySet();
+        final int upperIndex = Math.min(topic.partitions().size(), startIndex 
+ maxCount);
+        for (int partitionId = startIndex; partitionId < upperIndex; 
partitionId++) {
+            PartitionRegistration partition = 
topic.partitions().get(partitionId);
+            if (partition == null) {
+                log.warn("The partition {} does not exist for {}", 
partitionId, topicName);
+                continue;
+            }
+            List<Integer> filteredReplicas = maybeFilterAliveReplicas(image, 
partition.replicas, listenerName, false);
+            List<Integer> filteredIsr = maybeFilterAliveReplicas(image, 
partition.isr, listenerName, false);
+            List<Integer> offlineReplicas = getOfflineReplicas(image, 
partition, listenerName);
+            Optional<Node> maybeLeader = getAliveEndpoint(image, 
partition.leader, listenerName);
+            result.add(new DescribeTopicPartitionsResponsePartition()
+                .setPartitionIndex(partitionId)
+                
.setLeaderId(maybeLeader.map(Node::id).orElse(MetadataResponse.NO_LEADER_ID))
+                .setLeaderEpoch(partition.leaderEpoch)
+                .setReplicaNodes(filteredReplicas)
+                .setIsrNodes(filteredIsr)
+                .setOfflineReplicas(offlineReplicas)
+                .setEligibleLeaderReplicas(Replicas.toList(partition.elr))
+                .setLastKnownElr(Replicas.toList(partition.lastKnownElr)));
+        }
+        return Map.entry(Optional.of(result), (upperIndex < partitions.size()) 
? upperIndex : -1);
+    }
+
+    private List<Integer> getOfflineReplicas(MetadataImage image, 
PartitionRegistration partition, ListenerName listenerName) {
+        List<Integer> offlineReplicas = new ArrayList<>(0);
+        for (int brokerId : partition.replicas) {
+            BrokerRegistration broker = image.cluster().broker(brokerId);
+            if (broker == null || isReplicaOffline(partition, listenerName, 
broker)) {
+                offlineReplicas.add(brokerId);
+            }
+        }
+        return offlineReplicas;
+    }
+
+    private boolean isReplicaOffline(PartitionRegistration partition, 
ListenerName listenerName, BrokerRegistration broker) {
+        return broker.fenced() || 
!broker.listeners().containsKey(listenerName.value()) || 
isReplicaInOfflineDir(broker, partition);
+    }
+
+    private boolean isReplicaInOfflineDir(BrokerRegistration broker, 
PartitionRegistration partition) {
+        return !broker.hasOnlineDir(partition.directory(broker.id()));
+    }
+
+    /**
+     * Get the endpoint matching the provided listener if the broker is alive. 
Note that listeners can
+     * be added dynamically, so a broker with a missing listener could be a 
transient error.
+     *
+     * @return None if broker is not alive or if the broker does not have a 
listener named `listenerName`.
+     */
+    private Optional<Node> getAliveEndpoint(MetadataImage image, int id, 
ListenerName listenerName) {
+        return image.cluster().broker(id) == null ? Optional.empty() :
+            image.cluster().broker(id).node(listenerName.value());
+    }
+
+    @Override
+    public List<MetadataResponseTopic> getTopicMetadata(
+        Set<String> topics,
+        ListenerName listenerName,
+        boolean errorUnavailableEndpoints,
+        boolean errorUnavailableListeners
+    ) {
+        MetadataImage image = currentImage;
+        return topics.stream().flatMap(topic -> {
+            List<MetadataResponsePartition> partitions = 
getPartitionMetadata(image, topic, listenerName, errorUnavailableEndpoints, 
errorUnavailableListeners);
+            if (partitions.isEmpty()) return Stream.empty();
+            return Stream.of(new MetadataResponseTopic()
+                .setErrorCode(Errors.NONE.code())
+                .setName(topic)
+                .setTopicId(image.topics().getTopic(topic) == null ? 
Uuid.ZERO_UUID : image.topics().getTopic(topic).id())
+                .setIsInternal(Topic.isInternal(topic))
+                .setPartitions(partitions));
+        }).toList();
+    }
+
+    @Override
+    public DescribeTopicPartitionsResponseData describeTopicResponse(
+        Iterator<String> topics,
+        ListenerName listenerName,
+        Function<String, Integer> topicPartitionStartIndex,
+        int maximumNumberOfPartitions,
+        boolean ignoreTopicsWithExceptions
+    ) {
+        MetadataImage image = currentImage;
+        AtomicInteger remaining = new AtomicInteger(maximumNumberOfPartitions);
+        DescribeTopicPartitionsResponseData result = new 
DescribeTopicPartitionsResponseData();
+        while (topics.hasNext()) {
+            String topicName = topics.next();
+            if (remaining.get() > 0) {
+                
Entry<Optional<List<DescribeTopicPartitionsResponsePartition>>, Integer> 
partitionResponseEntry = getPartitionMetadataForDescribeTopicResponse(image, 
topicName, listenerName, topicPartitionStartIndex.apply(topicName), 
remaining.get());
+                Optional<List<DescribeTopicPartitionsResponsePartition>> 
partitionResponse = partitionResponseEntry.getKey();
+                int nextPartition = partitionResponseEntry.getValue();
+                AtomicBoolean breakLoop = new AtomicBoolean(false);
+                partitionResponse.ifPresent(partitions -> {
+                    DescribeTopicPartitionsResponseTopic response = new 
DescribeTopicPartitionsResponseTopic()
+                        .setErrorCode(Errors.NONE.code())
+                        .setName(topicName)
+                        
.setTopicId(Optional.ofNullable(image.topics().getTopic(topicName).id()).orElse(Uuid.ZERO_UUID))
+                        .setIsInternal(Topic.isInternal(topicName))
+                        .setPartitions(partitions);
+                    result.topics().add(response);
+
+                    if (nextPartition != -1) {
+                        result.setNextCursor(new 
Cursor().setTopicName(topicName).setPartitionIndex(nextPartition));
+                        breakLoop.set(true);
+                    }
+                    remaining.addAndGet(-partitions.size());

Review Comment:
   we should not update `remaining` if `breakLoop` is true, right?



##########
metadata/src/main/java/org/apache/kafka/metadata/KRaftMetadataCache.java:
##########
@@ -0,0 +1,513 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.metadata;
+
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.message.DescribeClientQuotasRequestData;
+import org.apache.kafka.common.message.DescribeClientQuotasResponseData;
+import org.apache.kafka.common.message.DescribeTopicPartitionsResponseData;
+import 
org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.Cursor;
+import 
org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponsePartition;
+import 
org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponseTopic;
+import org.apache.kafka.common.message.DescribeUserScramCredentialsRequestData;
+import 
org.apache.kafka.common.message.DescribeUserScramCredentialsResponseData;
+import 
org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition;
+import 
org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.TopicImage;
+import org.apache.kafka.server.common.FinalizedFeatures;
+import org.apache.kafka.server.common.KRaftVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class KRaftMetadataCache implements MetadataCache {
+    private final Logger log;
+    private final Supplier<KRaftVersion> kraftVersionSupplier;
+
+    // This is the cache state. Every MetadataImage instance is immutable, and 
updates
+    // replace this value with a completely new one. This means reads (which 
are not under
+    // any lock) need to grab the value of this variable once, and retain that 
read copy for
+    // the duration of their operation. Multiple reads of this value risk 
getting different
+    // image values.
+    private volatile MetadataImage currentImage = MetadataImage.EMPTY;
+
+    public KRaftMetadataCache(int brokerId, Supplier<KRaftVersion> 
kraftVersionSupplier) {
+        this.kraftVersionSupplier = kraftVersionSupplier;
+        this.log = new LogContext("[MetadataCache brokerId=" + brokerId + "] 
").logger(KRaftMetadataCache.class);
+    }
+
+    /**
+     * Filter the alive replicas. It returns all brokers when 
filterUnavailableEndpoints is false.
+     * Otherwise, it filters the brokers that are fenced or do not have the 
listener.
+     * <p>
+     * This method is the main hotspot when it comes to the performance of 
metadata requests,
+     * we should be careful about adding additional logic here.
+     * @param image                      The metadata image.
+     * @param brokers                    The list of brokers to filter.
+     * @param listenerName               The listener name.
+     * @param filterUnavailableEndpoints Whether to filter the unavailable 
endpoints. This field is to support v0 MetadataResponse.
+     */
+    private List<Integer> maybeFilterAliveReplicas(
+        MetadataImage image,
+        int[] brokers,
+        ListenerName listenerName,
+        boolean filterUnavailableEndpoints
+    ) {
+        if (!filterUnavailableEndpoints) return Replicas.toList(brokers);
+        List<Integer> res = new ArrayList<>(brokers.length);
+        for (int brokerId : brokers) {
+            BrokerRegistration broker = image.cluster().broker(brokerId);
+            if (broker != null && !broker.fenced() && 
broker.listeners().containsKey(listenerName.value())) {
+                res.add(brokerId);
+            }
+        }
+        return res;
+    }
+
+    public MetadataImage currentImage() {
+        return currentImage;
+    }
+
+    /**
+     * Get the partition metadata for the given topic and listener. If 
errorUnavailableEndpoints is true,
+     * it uses all brokers in the partitions. Otherwise, it filters the 
unavailable endpoints.
+     * If errorUnavailableListeners is true, it returns LISTENER_NOT_FOUND if 
the listener is missing on the broker.
+     * Otherwise, it returns LEADER_NOT_AVAILABLE for broker unavailable.
+     *
+     * @param image                     The metadata image.
+     * @param topicName                 The name of the topic.
+     * @param listenerName              The listener name.
+     * @param errorUnavailableEndpoints Whether to filter the unavailable 
endpoints. This field is to support v0 MetadataResponse.
+     * @param errorUnavailableListeners Whether to return LISTENER_NOT_FOUND 
or LEADER_NOT_AVAILABLE.
+     */
+    private List<MetadataResponsePartition> getPartitionMetadata(
+        MetadataImage image,
+        String topicName,
+        ListenerName listenerName,
+        boolean errorUnavailableEndpoints,
+        boolean errorUnavailableListeners
+    ) {
+        TopicImage topicImage = image.topics().getTopic(topicName);
+        if (topicImage == null) return List.of();
+        return topicImage.partitions().entrySet().stream().map(entry -> {
+            int partitionId = entry.getKey();
+            PartitionRegistration partition = entry.getValue();
+            List<Integer> filteredReplicas = maybeFilterAliveReplicas(image, 
partition.replicas, listenerName, errorUnavailableEndpoints);
+            List<Integer> filteredIsr = maybeFilterAliveReplicas(image, 
partition.isr, listenerName, errorUnavailableEndpoints);
+            List<Integer> offlineReplicas = getOfflineReplicas(image, 
partition, listenerName);
+            Optional<Node> maybeLeader = getAliveEndpoint(image, 
partition.leader, listenerName);
+            Errors error;
+            if (maybeLeader.isEmpty()) {
+                if (!image.cluster().brokers().containsKey(partition.leader)) {
+                    log.debug("Error while fetching metadata for {}-{}: leader 
not available", topicName, partitionId);
+                    error = Errors.LEADER_NOT_AVAILABLE;
+                } else {
+                    log.debug("Error while fetching metadata for {}-{}: 
listener {} not found on leader {}", topicName, partitionId, listenerName, 
partition.leader);
+                    error = errorUnavailableListeners ? 
Errors.LISTENER_NOT_FOUND : Errors.LEADER_NOT_AVAILABLE;
+                }
+                return new MetadataResponsePartition()
+                    .setErrorCode(error.code())
+                    .setPartitionIndex(partitionId)
+                    .setLeaderId(MetadataResponse.NO_LEADER_ID)
+                    .setLeaderEpoch(partition.leaderEpoch)
+                    .setReplicaNodes(filteredReplicas)
+                    .setIsrNodes(filteredIsr)
+                    .setOfflineReplicas(offlineReplicas);
+            } else {
+                if (filteredReplicas.size() < partition.replicas.length) {
+                    log.debug("Error while fetching metadata for {}-{}: 
replica information not available for following brokers {}", topicName, 
partitionId, Arrays.stream(partition.replicas).filter(b -> 
!filteredReplicas.contains(b)).mapToObj(String::valueOf).collect(Collectors.joining(",")));
+                    error = Errors.REPLICA_NOT_AVAILABLE;
+                } else if (filteredIsr.size() < partition.isr.length) {
+                    log.debug("Error while fetching metadata for {}-{}: in 
sync replica information not available for following brokers {}", topicName, 
partitionId, Arrays.stream(partition.isr).filter(b -> 
!filteredIsr.contains(b)).mapToObj(String::valueOf).collect(Collectors.joining(",")));
+                    error = Errors.REPLICA_NOT_AVAILABLE;
+                } else {
+                    error = Errors.NONE;
+                }
+                return new MetadataResponsePartition()
+                    .setErrorCode(error.code())
+                    .setPartitionIndex(partitionId)
+                    .setLeaderId(maybeLeader.get().id())
+                    .setLeaderEpoch(partition.leaderEpoch)
+                    .setReplicaNodes(filteredReplicas)
+                    .setIsrNodes(filteredIsr)
+                    .setOfflineReplicas(offlineReplicas);
+            }
+        }).toList();
+    }
+
+    /**
+     * Return topic partition metadata for the given topic, listener and index 
range. Also, return the next partition
+     * index that is not included in the result.
+     *
+     * @param image                       The metadata image
+     * @param topicName                   The name of the topic.
+     * @param listenerName                The listener name.
+     * @param startIndex                  The smallest index of the partitions 
to be included in the result.
+     *
+     * @return                            A collection of topic partition 
metadata and next partition index (-1 means
+     *                                    no next partition).
+     */
+    private Entry<Optional<List<DescribeTopicPartitionsResponsePartition>>, 
Integer> getPartitionMetadataForDescribeTopicResponse(
+        MetadataImage image,
+        String topicName,
+        ListenerName listenerName,
+        int startIndex,
+        int maxCount
+    ) {
+        TopicImage topic = image.topics().getTopic(topicName);
+        if (topic == null) return Map.entry(Optional.empty(), -1);
+        List<DescribeTopicPartitionsResponsePartition> result = new 
ArrayList<>();
+        final Set<Integer> partitions = topic.partitions().keySet();
+        final int upperIndex = Math.min(topic.partitions().size(), startIndex 
+ maxCount);
+        for (int partitionId = startIndex; partitionId < upperIndex; 
partitionId++) {
+            PartitionRegistration partition = 
topic.partitions().get(partitionId);
+            if (partition == null) {
+                log.warn("The partition {} does not exist for {}", 
partitionId, topicName);
+                continue;
+            }
+            List<Integer> filteredReplicas = maybeFilterAliveReplicas(image, 
partition.replicas, listenerName, false);
+            List<Integer> filteredIsr = maybeFilterAliveReplicas(image, 
partition.isr, listenerName, false);
+            List<Integer> offlineReplicas = getOfflineReplicas(image, 
partition, listenerName);
+            Optional<Node> maybeLeader = getAliveEndpoint(image, 
partition.leader, listenerName);
+            result.add(new DescribeTopicPartitionsResponsePartition()
+                .setPartitionIndex(partitionId)
+                
.setLeaderId(maybeLeader.map(Node::id).orElse(MetadataResponse.NO_LEADER_ID))
+                .setLeaderEpoch(partition.leaderEpoch)
+                .setReplicaNodes(filteredReplicas)
+                .setIsrNodes(filteredIsr)
+                .setOfflineReplicas(offlineReplicas)
+                .setEligibleLeaderReplicas(Replicas.toList(partition.elr))
+                .setLastKnownElr(Replicas.toList(partition.lastKnownElr)));
+        }
+        return Map.entry(Optional.of(result), (upperIndex < partitions.size()) 
? upperIndex : -1);
+    }
+
+    private List<Integer> getOfflineReplicas(MetadataImage image, 
PartitionRegistration partition, ListenerName listenerName) {
+        List<Integer> offlineReplicas = new ArrayList<>(0);
+        for (int brokerId : partition.replicas) {
+            BrokerRegistration broker = image.cluster().broker(brokerId);
+            if (broker == null || isReplicaOffline(partition, listenerName, 
broker)) {
+                offlineReplicas.add(brokerId);
+            }
+        }
+        return offlineReplicas;
+    }
+
+    private boolean isReplicaOffline(PartitionRegistration partition, 
ListenerName listenerName, BrokerRegistration broker) {
+        return broker.fenced() || 
!broker.listeners().containsKey(listenerName.value()) || 
isReplicaInOfflineDir(broker, partition);
+    }
+
+    private boolean isReplicaInOfflineDir(BrokerRegistration broker, 
PartitionRegistration partition) {
+        return !broker.hasOnlineDir(partition.directory(broker.id()));
+    }
+
+    /**
+     * Get the endpoint matching the provided listener if the broker is alive. 
Note that listeners can
+     * be added dynamically, so a broker with a missing listener could be a 
transient error.
+     *
+     * @return None if broker is not alive or if the broker does not have a 
listener named `listenerName`.
+     */
+    private Optional<Node> getAliveEndpoint(MetadataImage image, int id, 
ListenerName listenerName) {
+        return image.cluster().broker(id) == null ? Optional.empty() :
+            image.cluster().broker(id).node(listenerName.value());
+    }
+
+    @Override
+    public List<MetadataResponseTopic> getTopicMetadata(
+        Set<String> topics,
+        ListenerName listenerName,
+        boolean errorUnavailableEndpoints,
+        boolean errorUnavailableListeners
+    ) {
+        MetadataImage image = currentImage;
+        return topics.stream().flatMap(topic -> {
+            List<MetadataResponsePartition> partitions = 
getPartitionMetadata(image, topic, listenerName, errorUnavailableEndpoints, 
errorUnavailableListeners);

Review Comment:
   `getPartitionMetadata` -> `partitionMetadata`



##########
metadata/src/main/java/org/apache/kafka/metadata/KRaftMetadataCache.java:
##########
@@ -0,0 +1,513 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.metadata;
+
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.message.DescribeClientQuotasRequestData;
+import org.apache.kafka.common.message.DescribeClientQuotasResponseData;
+import org.apache.kafka.common.message.DescribeTopicPartitionsResponseData;
+import 
org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.Cursor;
+import 
org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponsePartition;
+import 
org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponseTopic;
+import org.apache.kafka.common.message.DescribeUserScramCredentialsRequestData;
+import 
org.apache.kafka.common.message.DescribeUserScramCredentialsResponseData;
+import 
org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition;
+import 
org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.TopicImage;
+import org.apache.kafka.server.common.FinalizedFeatures;
+import org.apache.kafka.server.common.KRaftVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class KRaftMetadataCache implements MetadataCache {
+    private final Logger log;
+    private final Supplier<KRaftVersion> kraftVersionSupplier;
+
+    // This is the cache state. Every MetadataImage instance is immutable, and 
updates
+    // replace this value with a completely new one. This means reads (which 
are not under
+    // any lock) need to grab the value of this variable once, and retain that 
read copy for
+    // the duration of their operation. Multiple reads of this value risk 
getting different
+    // image values.
+    private volatile MetadataImage currentImage = MetadataImage.EMPTY;
+
+    public KRaftMetadataCache(int brokerId, Supplier<KRaftVersion> 
kraftVersionSupplier) {
+        this.kraftVersionSupplier = kraftVersionSupplier;
+        this.log = new LogContext("[MetadataCache brokerId=" + brokerId + "] 
").logger(KRaftMetadataCache.class);
+    }
+
+    /**
+     * Filter the alive replicas. It returns all brokers when 
filterUnavailableEndpoints is false.
+     * Otherwise, it filters the brokers that are fenced or do not have the 
listener.
+     * <p>
+     * This method is the main hotspot when it comes to the performance of 
metadata requests,
+     * we should be careful about adding additional logic here.
+     * @param image                      The metadata image.
+     * @param brokers                    The list of brokers to filter.
+     * @param listenerName               The listener name.
+     * @param filterUnavailableEndpoints Whether to filter the unavailable 
endpoints. This field is to support v0 MetadataResponse.
+     */
+    private List<Integer> maybeFilterAliveReplicas(
+        MetadataImage image,
+        int[] brokers,
+        ListenerName listenerName,
+        boolean filterUnavailableEndpoints
+    ) {
+        if (!filterUnavailableEndpoints) return Replicas.toList(brokers);
+        List<Integer> res = new ArrayList<>(brokers.length);
+        for (int brokerId : brokers) {
+            BrokerRegistration broker = image.cluster().broker(brokerId);
+            if (broker != null && !broker.fenced() && 
broker.listeners().containsKey(listenerName.value())) {
+                res.add(brokerId);
+            }
+        }
+        return res;
+    }
+
+    public MetadataImage currentImage() {
+        return currentImage;
+    }
+
+    /**
+     * Get the partition metadata for the given topic and listener. If 
errorUnavailableEndpoints is true,
+     * it uses all brokers in the partitions. Otherwise, it filters the 
unavailable endpoints.
+     * If errorUnavailableListeners is true, it returns LISTENER_NOT_FOUND if 
the listener is missing on the broker.
+     * Otherwise, it returns LEADER_NOT_AVAILABLE for broker unavailable.
+     *
+     * @param image                     The metadata image.
+     * @param topicName                 The name of the topic.
+     * @param listenerName              The listener name.
+     * @param errorUnavailableEndpoints Whether to filter the unavailable 
endpoints. This field is to support v0 MetadataResponse.
+     * @param errorUnavailableListeners Whether to return LISTENER_NOT_FOUND 
or LEADER_NOT_AVAILABLE.
+     */
+    private List<MetadataResponsePartition> getPartitionMetadata(
+        MetadataImage image,
+        String topicName,
+        ListenerName listenerName,
+        boolean errorUnavailableEndpoints,
+        boolean errorUnavailableListeners
+    ) {
+        TopicImage topicImage = image.topics().getTopic(topicName);
+        if (topicImage == null) return List.of();
+        return topicImage.partitions().entrySet().stream().map(entry -> {
+            int partitionId = entry.getKey();
+            PartitionRegistration partition = entry.getValue();
+            List<Integer> filteredReplicas = maybeFilterAliveReplicas(image, 
partition.replicas, listenerName, errorUnavailableEndpoints);
+            List<Integer> filteredIsr = maybeFilterAliveReplicas(image, 
partition.isr, listenerName, errorUnavailableEndpoints);
+            List<Integer> offlineReplicas = getOfflineReplicas(image, 
partition, listenerName);
+            Optional<Node> maybeLeader = getAliveEndpoint(image, 
partition.leader, listenerName);
+            Errors error;
+            if (maybeLeader.isEmpty()) {
+                if (!image.cluster().brokers().containsKey(partition.leader)) {
+                    log.debug("Error while fetching metadata for {}-{}: leader 
not available", topicName, partitionId);
+                    error = Errors.LEADER_NOT_AVAILABLE;
+                } else {
+                    log.debug("Error while fetching metadata for {}-{}: 
listener {} not found on leader {}", topicName, partitionId, listenerName, 
partition.leader);
+                    error = errorUnavailableListeners ? 
Errors.LISTENER_NOT_FOUND : Errors.LEADER_NOT_AVAILABLE;
+                }
+                return new MetadataResponsePartition()
+                    .setErrorCode(error.code())
+                    .setPartitionIndex(partitionId)
+                    .setLeaderId(MetadataResponse.NO_LEADER_ID)
+                    .setLeaderEpoch(partition.leaderEpoch)
+                    .setReplicaNodes(filteredReplicas)
+                    .setIsrNodes(filteredIsr)
+                    .setOfflineReplicas(offlineReplicas);
+            } else {
+                if (filteredReplicas.size() < partition.replicas.length) {
+                    log.debug("Error while fetching metadata for {}-{}: 
replica information not available for following brokers {}", topicName, 
partitionId, Arrays.stream(partition.replicas).filter(b -> 
!filteredReplicas.contains(b)).mapToObj(String::valueOf).collect(Collectors.joining(",")));
+                    error = Errors.REPLICA_NOT_AVAILABLE;
+                } else if (filteredIsr.size() < partition.isr.length) {
+                    log.debug("Error while fetching metadata for {}-{}: in 
sync replica information not available for following brokers {}", topicName, 
partitionId, Arrays.stream(partition.isr).filter(b -> 
!filteredIsr.contains(b)).mapToObj(String::valueOf).collect(Collectors.joining(",")));
+                    error = Errors.REPLICA_NOT_AVAILABLE;
+                } else {
+                    error = Errors.NONE;
+                }
+                return new MetadataResponsePartition()
+                    .setErrorCode(error.code())
+                    .setPartitionIndex(partitionId)
+                    .setLeaderId(maybeLeader.get().id())
+                    .setLeaderEpoch(partition.leaderEpoch)
+                    .setReplicaNodes(filteredReplicas)
+                    .setIsrNodes(filteredIsr)
+                    .setOfflineReplicas(offlineReplicas);
+            }
+        }).toList();
+    }
+
+    /**
+     * Return topic partition metadata for the given topic, listener and index 
range. Also, return the next partition
+     * index that is not included in the result.
+     *
+     * @param image                       The metadata image
+     * @param topicName                   The name of the topic.
+     * @param listenerName                The listener name.
+     * @param startIndex                  The smallest index of the partitions 
to be included in the result.
+     *
+     * @return                            A collection of topic partition 
metadata and next partition index (-1 means
+     *                                    no next partition).
+     */
+    private Entry<Optional<List<DescribeTopicPartitionsResponsePartition>>, 
Integer> getPartitionMetadataForDescribeTopicResponse(
+        MetadataImage image,
+        String topicName,
+        ListenerName listenerName,
+        int startIndex,
+        int maxCount
+    ) {
+        TopicImage topic = image.topics().getTopic(topicName);
+        if (topic == null) return Map.entry(Optional.empty(), -1);
+        List<DescribeTopicPartitionsResponsePartition> result = new 
ArrayList<>();
+        final Set<Integer> partitions = topic.partitions().keySet();
+        final int upperIndex = Math.min(topic.partitions().size(), startIndex 
+ maxCount);
+        for (int partitionId = startIndex; partitionId < upperIndex; 
partitionId++) {
+            PartitionRegistration partition = 
topic.partitions().get(partitionId);
+            if (partition == null) {
+                log.warn("The partition {} does not exist for {}", 
partitionId, topicName);
+                continue;
+            }
+            List<Integer> filteredReplicas = maybeFilterAliveReplicas(image, 
partition.replicas, listenerName, false);
+            List<Integer> filteredIsr = maybeFilterAliveReplicas(image, 
partition.isr, listenerName, false);
+            List<Integer> offlineReplicas = getOfflineReplicas(image, 
partition, listenerName);
+            Optional<Node> maybeLeader = getAliveEndpoint(image, 
partition.leader, listenerName);
+            result.add(new DescribeTopicPartitionsResponsePartition()
+                .setPartitionIndex(partitionId)
+                
.setLeaderId(maybeLeader.map(Node::id).orElse(MetadataResponse.NO_LEADER_ID))
+                .setLeaderEpoch(partition.leaderEpoch)
+                .setReplicaNodes(filteredReplicas)
+                .setIsrNodes(filteredIsr)
+                .setOfflineReplicas(offlineReplicas)
+                .setEligibleLeaderReplicas(Replicas.toList(partition.elr))
+                .setLastKnownElr(Replicas.toList(partition.lastKnownElr)));
+        }
+        return Map.entry(Optional.of(result), (upperIndex < partitions.size()) 
? upperIndex : -1);
+    }
+
+    private List<Integer> getOfflineReplicas(MetadataImage image, 
PartitionRegistration partition, ListenerName listenerName) {
+        List<Integer> offlineReplicas = new ArrayList<>(0);
+        for (int brokerId : partition.replicas) {
+            BrokerRegistration broker = image.cluster().broker(brokerId);
+            if (broker == null || isReplicaOffline(partition, listenerName, 
broker)) {
+                offlineReplicas.add(brokerId);
+            }
+        }
+        return offlineReplicas;
+    }
+
+    private boolean isReplicaOffline(PartitionRegistration partition, 
ListenerName listenerName, BrokerRegistration broker) {
+        return broker.fenced() || 
!broker.listeners().containsKey(listenerName.value()) || 
isReplicaInOfflineDir(broker, partition);
+    }
+
+    private boolean isReplicaInOfflineDir(BrokerRegistration broker, 
PartitionRegistration partition) {
+        return !broker.hasOnlineDir(partition.directory(broker.id()));
+    }
+
+    /**
+     * Get the endpoint matching the provided listener if the broker is alive. 
Note that listeners can
+     * be added dynamically, so a broker with a missing listener could be a 
transient error.
+     *
+     * @return None if broker is not alive or if the broker does not have a 
listener named `listenerName`.
+     */
+    private Optional<Node> getAliveEndpoint(MetadataImage image, int id, 
ListenerName listenerName) {
+        return image.cluster().broker(id) == null ? Optional.empty() :
+            image.cluster().broker(id).node(listenerName.value());
+    }
+
+    @Override
+    public List<MetadataResponseTopic> getTopicMetadata(
+        Set<String> topics,
+        ListenerName listenerName,
+        boolean errorUnavailableEndpoints,
+        boolean errorUnavailableListeners
+    ) {
+        MetadataImage image = currentImage;
+        return topics.stream().flatMap(topic -> {
+            List<MetadataResponsePartition> partitions = 
getPartitionMetadata(image, topic, listenerName, errorUnavailableEndpoints, 
errorUnavailableListeners);
+            if (partitions.isEmpty()) return Stream.empty();
+            return Stream.of(new MetadataResponseTopic()
+                .setErrorCode(Errors.NONE.code())
+                .setName(topic)
+                .setTopicId(image.topics().getTopic(topic) == null ? 
Uuid.ZERO_UUID : image.topics().getTopic(topic).id())
+                .setIsInternal(Topic.isInternal(topic))
+                .setPartitions(partitions));
+        }).toList();
+    }
+
+    @Override
+    public DescribeTopicPartitionsResponseData describeTopicResponse(
+        Iterator<String> topics,
+        ListenerName listenerName,
+        Function<String, Integer> topicPartitionStartIndex,
+        int maximumNumberOfPartitions,
+        boolean ignoreTopicsWithExceptions
+    ) {
+        MetadataImage image = currentImage;
+        AtomicInteger remaining = new AtomicInteger(maximumNumberOfPartitions);
+        DescribeTopicPartitionsResponseData result = new 
DescribeTopicPartitionsResponseData();
+        while (topics.hasNext()) {
+            String topicName = topics.next();
+            if (remaining.get() > 0) {
+                
Entry<Optional<List<DescribeTopicPartitionsResponsePartition>>, Integer> 
partitionResponseEntry = getPartitionMetadataForDescribeTopicResponse(image, 
topicName, listenerName, topicPartitionStartIndex.apply(topicName), 
remaining.get());
+                Optional<List<DescribeTopicPartitionsResponsePartition>> 
partitionResponse = partitionResponseEntry.getKey();
+                int nextPartition = partitionResponseEntry.getValue();
+                AtomicBoolean breakLoop = new AtomicBoolean(false);
+                partitionResponse.ifPresent(partitions -> {

Review Comment:
   this lambda function causes many weird code style. maybe we can use 
`isPresent` and `get` to refactor it? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to