chia7712 commented on code in PR #18801: URL: https://github.com/apache/kafka/pull/18801#discussion_r1973109296
########## core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala: ########## @@ -345,68 +352,75 @@ class KRaftMetadataCache( Option(_currentImage.cluster.broker(brokerId)).count(_.inControlledShutdown) == 1 } - override def getAliveBrokers(): Iterable[BrokerMetadata] = getAliveBrokers(_currentImage) + override def getAliveBrokers(): util.List[BrokerMetadata] = getAliveBrokers(_currentImage) - private def getAliveBrokers(image: MetadataImage): Iterable[BrokerMetadata] = { - image.cluster().brokers().values().asScala.filterNot(_.fenced()). - map(b => new BrokerMetadata(b.id, b.rack)) + private def getAliveBrokers(image: MetadataImage): util.List[BrokerMetadata] = { + _currentImage.cluster().brokers().values().stream() Review Comment: `_currentImage` -> `image` ########## metadata/src/main/java/org/apache/kafka/metadata/MetadataCache.java: ########## @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.metadata; + +import org.apache.kafka.admin.BrokerMetadata; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.internals.Topic; +import org.apache.kafka.common.message.DescribeClientQuotasRequestData; +import org.apache.kafka.common.message.DescribeClientQuotasResponseData; +import org.apache.kafka.common.message.DescribeTopicPartitionsResponseData; +import org.apache.kafka.common.message.DescribeUserScramCredentialsRequestData; +import org.apache.kafka.common.message.DescribeUserScramCredentialsResponseData; +import org.apache.kafka.common.message.MetadataResponseData; +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.image.MetadataImage; +import org.apache.kafka.server.common.FinalizedFeatures; +import org.apache.kafka.server.common.MetadataVersion; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; +import java.util.function.Function; +import java.util.stream.Collectors; + +public interface MetadataCache extends ConfigRepository { + + /** + * Return topic metadata for a given set of topics and listener. See KafkaApis#handleTopicMetadataRequest for details + * on the use of the two boolean flags. + * + * @param topics The set of topics. + * @param listenerName The listener name. + * @param errorUnavailableEndpoints If true, we return an error on unavailable brokers. This is used to support + * MetadataResponse version 0. + * @param errorUnavailableListeners If true, return LEADER_NOT_AVAILABLE if the listener is not found on the leader. + * This is used for MetadataResponse versions 0-5. + * @return A collection of topic metadata. + */ + List<MetadataResponseData.MetadataResponseTopic> getTopicMetadata( + Set<String> topics, + ListenerName listenerName, + boolean errorUnavailableEndpoints, + boolean errorUnavailableListeners); + + Set<String> getAllTopics(); + + Set<TopicPartition> getTopicPartitions(String topicName); + + boolean hasAliveBroker(int brokerId); + + List<BrokerMetadata> getAliveBrokers(); + + Optional<Long> getAliveBrokerEpoch(int brokerId); + + boolean isBrokerFenced(int brokerId); + + boolean isBrokerShuttingDown(int brokerId); + + Uuid getTopicId(String topicName); + + Optional<String> getTopicName(Uuid topicId); + + Optional<Node> getAliveBrokerNode(int brokerId, ListenerName listenerName); + + List<Node> getAliveBrokerNodes(ListenerName listenerName); + + List<Node> getBrokerNodes(ListenerName listenerName); + + Optional<LeaderAndIsr> getLeaderAndIsr(String topic, int partitionId); + + /** + * Return the number of partitions in the given topic, or None if the given topic does not exist. + */ + Optional<Integer> numPartitions(String topic); + + Map<String, Uuid> topicNamesToIds(); Review Comment: it is useless - maybe we can remove it to simplify the interface. ########## metadata/src/main/java/org/apache/kafka/metadata/MetadataCache.java: ########## @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.metadata; + +import org.apache.kafka.admin.BrokerMetadata; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.internals.Topic; +import org.apache.kafka.common.message.DescribeClientQuotasRequestData; +import org.apache.kafka.common.message.DescribeClientQuotasResponseData; +import org.apache.kafka.common.message.DescribeTopicPartitionsResponseData; +import org.apache.kafka.common.message.DescribeUserScramCredentialsRequestData; +import org.apache.kafka.common.message.DescribeUserScramCredentialsResponseData; +import org.apache.kafka.common.message.MetadataResponseData; +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.image.MetadataImage; +import org.apache.kafka.server.common.FinalizedFeatures; +import org.apache.kafka.server.common.MetadataVersion; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; +import java.util.function.Function; +import java.util.stream.Collectors; + +public interface MetadataCache extends ConfigRepository { + + /** + * Return topic metadata for a given set of topics and listener. See KafkaApis#handleTopicMetadataRequest for details + * on the use of the two boolean flags. + * + * @param topics The set of topics. + * @param listenerName The listener name. + * @param errorUnavailableEndpoints If true, we return an error on unavailable brokers. This is used to support + * MetadataResponse version 0. + * @param errorUnavailableListeners If true, return LEADER_NOT_AVAILABLE if the listener is not found on the leader. + * This is used for MetadataResponse versions 0-5. + * @return A collection of topic metadata. + */ + List<MetadataResponseData.MetadataResponseTopic> getTopicMetadata( + Set<String> topics, + ListenerName listenerName, + boolean errorUnavailableEndpoints, + boolean errorUnavailableListeners); + + Set<String> getAllTopics(); + + Set<TopicPartition> getTopicPartitions(String topicName); + + boolean hasAliveBroker(int brokerId); + + List<BrokerMetadata> getAliveBrokers(); + + Optional<Long> getAliveBrokerEpoch(int brokerId); + + boolean isBrokerFenced(int brokerId); + + boolean isBrokerShuttingDown(int brokerId); + + Uuid getTopicId(String topicName); + + Optional<String> getTopicName(Uuid topicId); + + Optional<Node> getAliveBrokerNode(int brokerId, ListenerName listenerName); + + List<Node> getAliveBrokerNodes(ListenerName listenerName); + + List<Node> getBrokerNodes(ListenerName listenerName); + + Optional<LeaderAndIsr> getLeaderAndIsr(String topic, int partitionId); + + /** + * Return the number of partitions in the given topic, or None if the given topic does not exist. + */ + Optional<Integer> numPartitions(String topic); + + Map<String, Uuid> topicNamesToIds(); + + Map<Uuid, String> topicIdsToNames(); + + Map.Entry<Map<String, Uuid>, Map<Uuid, String>> topicIdInfo(); Review Comment: ditto ########## core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala: ########## @@ -239,35 +239,37 @@ class KRaftMetadataCache( } // errorUnavailableEndpoints exists to support v0 MetadataResponses - override def getTopicMetadata(topics: Set[String], + override def getTopicMetadata(topics: util.Set[String], listenerName: ListenerName, errorUnavailableEndpoints: Boolean = false, - errorUnavailableListeners: Boolean = false): Seq[MetadataResponseTopic] = { + errorUnavailableListeners: Boolean = false): util.List[MetadataResponseTopic] = { val image = _currentImage - topics.toSeq.flatMap { topic => - getPartitionMetadata(image, topic, listenerName, errorUnavailableEndpoints, errorUnavailableListeners).map { partitionMetadata => - new MetadataResponseTopic() - .setErrorCode(Errors.NONE.code) - .setName(topic) - .setTopicId(Option(image.topics().getTopic(topic).id()).getOrElse(Uuid.ZERO_UUID)) - .setIsInternal(Topic.isInternal(topic)) - .setPartitions(partitionMetadata.toBuffer.asJava) + topics.stream().flatMap(topic => + getPartitionMetadata(image, topic, listenerName, errorUnavailableEndpoints, errorUnavailableListeners) match { + case Some(partitionMetadata) => + util.stream.Stream.of(new MetadataResponseTopic() + .setErrorCode(Errors.NONE.code) + .setName(topic) + .setTopicId(Option(image.topics().getTopic(topic).id()).getOrElse(Uuid.ZERO_UUID)) + .setIsInternal(Topic.isInternal(topic)) + .setPartitions(partitionMetadata.toBuffer.asJava)) + case None => util.stream.Stream.empty() Review Comment: please remove the redundant space ########## test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java: ########## @@ -290,7 +290,7 @@ default void waitForTopic(String topic, int partitions) throws InterruptedExcept TestUtils.waitForCondition( () -> brokers.stream().allMatch(broker -> partitions == 0 ? broker.metadataCache().numPartitions(topic).isEmpty() : - broker.metadataCache().numPartitions(topic).contains(partitions) + broker.metadataCache().numPartitions(topic).get() == partitions Review Comment: `broker.metadataCache().numPartitions(topic).filter(p -> p == partitions).isPresent()` ########## core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala: ########## @@ -239,35 +239,37 @@ class KRaftMetadataCache( } // errorUnavailableEndpoints exists to support v0 MetadataResponses - override def getTopicMetadata(topics: Set[String], + override def getTopicMetadata(topics: util.Set[String], listenerName: ListenerName, errorUnavailableEndpoints: Boolean = false, - errorUnavailableListeners: Boolean = false): Seq[MetadataResponseTopic] = { + errorUnavailableListeners: Boolean = false): util.List[MetadataResponseTopic] = { val image = _currentImage - topics.toSeq.flatMap { topic => - getPartitionMetadata(image, topic, listenerName, errorUnavailableEndpoints, errorUnavailableListeners).map { partitionMetadata => - new MetadataResponseTopic() - .setErrorCode(Errors.NONE.code) - .setName(topic) - .setTopicId(Option(image.topics().getTopic(topic).id()).getOrElse(Uuid.ZERO_UUID)) - .setIsInternal(Topic.isInternal(topic)) - .setPartitions(partitionMetadata.toBuffer.asJava) + topics.stream().flatMap(topic => + getPartitionMetadata(image, topic, listenerName, errorUnavailableEndpoints, errorUnavailableListeners) match { + case Some(partitionMetadata) => + util.stream.Stream.of(new MetadataResponseTopic() + .setErrorCode(Errors.NONE.code) + .setName(topic) + .setTopicId(Option(image.topics().getTopic(topic).id()).getOrElse(Uuid.ZERO_UUID)) + .setIsInternal(Topic.isInternal(topic)) + .setPartitions(partitionMetadata.toBuffer.asJava)) + case None => util.stream.Stream.empty() } - } + ).collect(Collectors.toList()) } override def describeTopicResponse( - topics: Iterator[String], + topics: util.Iterator[String], listenerName: ListenerName, - topicPartitionStartIndex: String => Int, + topicPartitionStartIndex: util.function.Function[String, Integer], maximumNumberOfPartitions: Int, ignoreTopicsWithExceptions: Boolean ): DescribeTopicPartitionsResponseData = { val image = _currentImage var remaining = maximumNumberOfPartitions val result = new DescribeTopicPartitionsResponseData() breakable { - topics.foreach { topicName => + topics.asScala.foreach { topicName => Review Comment: `forEachRemaining` ########## core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala: ########## @@ -345,68 +352,75 @@ class KRaftMetadataCache( Option(_currentImage.cluster.broker(brokerId)).count(_.inControlledShutdown) == 1 } - override def getAliveBrokers(): Iterable[BrokerMetadata] = getAliveBrokers(_currentImage) + override def getAliveBrokers(): util.List[BrokerMetadata] = getAliveBrokers(_currentImage) - private def getAliveBrokers(image: MetadataImage): Iterable[BrokerMetadata] = { - image.cluster().brokers().values().asScala.filterNot(_.fenced()). - map(b => new BrokerMetadata(b.id, b.rack)) + private def getAliveBrokers(image: MetadataImage): util.List[BrokerMetadata] = { + _currentImage.cluster().brokers().values().stream() + .filter(Predicate.not(_.fenced)) + .map(broker => new BrokerMetadata(broker.id, broker.rack)) + .collect(Collectors.toList()) } - override def getAliveBrokerNode(brokerId: Int, listenerName: ListenerName): Option[Node] = { - Option(_currentImage.cluster().broker(brokerId)).filterNot(_.fenced()). - flatMap(_.node(listenerName.value()).toScala) + override def getAliveBrokerNode(brokerId: Int, listenerName: ListenerName): util.Optional[Node] = { + util.Optional.ofNullable(_currentImage.cluster().broker(brokerId)) + .filter(Predicate.not(_.fenced)) + .flatMap(broker => broker.node(listenerName.value)) } - override def getAliveBrokerNodes(listenerName: ListenerName): Seq[Node] = { - _currentImage.cluster().brokers().values().asScala.filterNot(_.fenced()). - flatMap(_.node(listenerName.value()).toScala).toSeq + override def getAliveBrokerNodes(listenerName: ListenerName): util.List[Node] = { + _currentImage.cluster.brokers.values.stream + .filter(Predicate.not(_.fenced)) + .flatMap(broker => broker.node(listenerName.value).stream) + .collect(Collectors.toList()) } - override def getBrokerNodes(listenerName: ListenerName): Seq[Node] = { - _currentImage.cluster().brokers().values().asScala.flatMap(_.node(listenerName.value()).toScala).toSeq + override def getBrokerNodes(listenerName: ListenerName): util.List[Node] = { + _currentImage.cluster.brokers.values.stream + .flatMap(broker => broker.node(listenerName.value).stream) + .collect(Collectors.toList()) } - override def getLeaderAndIsr(topicName: String, partitionId: Int): Option[LeaderAndIsr] = { - Option(_currentImage.topics().getTopic(topicName)). - flatMap(topic => Option(topic.partitions().get(partitionId))). - flatMap(partition => Some(new LeaderAndIsr(partition.leader, partition.leaderEpoch, + override def getLeaderAndIsr(topicName: String, partitionId: Int): util.Optional[LeaderAndIsr] = { + util.Optional.ofNullable(_currentImage.topics().getTopic(topicName)). + flatMap(topic => util.Optional.ofNullable(topic.partitions().get(partitionId))). + flatMap(partition => util.Optional.ofNullable(new LeaderAndIsr(partition.leader, partition.leaderEpoch, util.Arrays.asList(partition.isr.map(i => i: java.lang.Integer): _*), partition.leaderRecoveryState, partition.partitionEpoch))) } - override def numPartitions(topicName: String): Option[Int] = { - Option(_currentImage.topics().getTopic(topicName)). + override def numPartitions(topicName: String): util.Optional[Integer] = { + util.Optional.ofNullable(_currentImage.topics().getTopic(topicName)). map(topic => topic.partitions().size()) } override def topicNamesToIds(): util.Map[String, Uuid] = _currentImage.topics.topicNameToIdView() override def topicIdsToNames(): util.Map[Uuid, String] = _currentImage.topics.topicIdToNameView() - override def topicIdInfo(): (util.Map[String, Uuid], util.Map[Uuid, String]) = { + override def topicIdInfo(): util.Map.Entry[util.Map[String, Uuid], util.Map[Uuid, String]] = { val image = _currentImage - (image.topics.topicNameToIdView(), image.topics.topicIdToNameView()) + new util.AbstractMap.SimpleEntry(image.topics.topicNameToIdView(), image.topics.topicIdToNameView()) } // if the leader is not known, return None; // if the leader is known and corresponding node is available, return Some(node) // if the leader is known but corresponding node with the listener name is not available, return Some(NO_NODE) - override def getPartitionLeaderEndpoint(topicName: String, partitionId: Int, listenerName: ListenerName): Option[Node] = { + override def getPartitionLeaderEndpoint(topicName: String, partitionId: Int, listenerName: ListenerName): util.Optional[Node] = { val image = _currentImage Option(image.topics().getTopic(topicName)) match { - case None => None + case None => util.Optional.empty() case Some(topic) => Option(topic.partitions().get(partitionId)) match { - case None => None + case None => util.Optional.empty() case Some(partition) => Option(image.cluster().broker(partition.leader)) match { - case None => Some(Node.noNode) - case Some(broker) => Some(broker.node(listenerName.value()).orElse(Node.noNode())) + case None => util.Optional.of(Node.noNode) + case Some(broker) => util.Optional.of(broker.node(listenerName.value()).orElse(Node.noNode())) } } } } - override def getPartitionReplicaEndpoints(tp: TopicPartition, listenerName: ListenerName): Map[Int, Node] = { + override def getPartitionReplicaEndpoints(tp: TopicPartition, listenerName: ListenerName): util.Map[Integer, Node] = { val image = _currentImage - val result = new mutable.HashMap[Int, Node]() + val result = new mutable.HashMap[Integer, Node]() Review Comment: could you please use java Map instead? ########## core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala: ########## @@ -186,9 +188,9 @@ class AddPartitionsToTxnManager( } private def getTransactionCoordinator(partition: Int): Option[Node] = { Review Comment: we can use Java optional as it has only one usage ########## metadata/src/main/java/org/apache/kafka/metadata/MetadataCache.java: ########## @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.metadata; + +import org.apache.kafka.admin.BrokerMetadata; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.internals.Topic; +import org.apache.kafka.common.message.DescribeClientQuotasRequestData; +import org.apache.kafka.common.message.DescribeClientQuotasResponseData; +import org.apache.kafka.common.message.DescribeTopicPartitionsResponseData; +import org.apache.kafka.common.message.DescribeUserScramCredentialsRequestData; +import org.apache.kafka.common.message.DescribeUserScramCredentialsResponseData; +import org.apache.kafka.common.message.MetadataResponseData; +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.image.MetadataImage; +import org.apache.kafka.server.common.FinalizedFeatures; +import org.apache.kafka.server.common.MetadataVersion; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; +import java.util.function.Function; +import java.util.stream.Collectors; + +public interface MetadataCache extends ConfigRepository { + + /** + * Return topic metadata for a given set of topics and listener. See KafkaApis#handleTopicMetadataRequest for details + * on the use of the two boolean flags. + * + * @param topics The set of topics. + * @param listenerName The listener name. + * @param errorUnavailableEndpoints If true, we return an error on unavailable brokers. This is used to support + * MetadataResponse version 0. + * @param errorUnavailableListeners If true, return LEADER_NOT_AVAILABLE if the listener is not found on the leader. + * This is used for MetadataResponse versions 0-5. + * @return A collection of topic metadata. + */ + List<MetadataResponseData.MetadataResponseTopic> getTopicMetadata( + Set<String> topics, + ListenerName listenerName, + boolean errorUnavailableEndpoints, + boolean errorUnavailableListeners); + + Set<String> getAllTopics(); + + Set<TopicPartition> getTopicPartitions(String topicName); + + boolean hasAliveBroker(int brokerId); + + List<BrokerMetadata> getAliveBrokers(); + + Optional<Long> getAliveBrokerEpoch(int brokerId); + + boolean isBrokerFenced(int brokerId); + + boolean isBrokerShuttingDown(int brokerId); + + Uuid getTopicId(String topicName); + + Optional<String> getTopicName(Uuid topicId); + + Optional<Node> getAliveBrokerNode(int brokerId, ListenerName listenerName); + + List<Node> getAliveBrokerNodes(ListenerName listenerName); + + List<Node> getBrokerNodes(ListenerName listenerName); + + Optional<LeaderAndIsr> getLeaderAndIsr(String topic, int partitionId); + + /** + * Return the number of partitions in the given topic, or None if the given topic does not exist. + */ + Optional<Integer> numPartitions(String topic); + + Map<String, Uuid> topicNamesToIds(); + + Map<Uuid, String> topicIdsToNames(); + + Map.Entry<Map<String, Uuid>, Map<Uuid, String>> topicIdInfo(); + + /** + * Get a partition leader's endpoint + * + * @return If the leader is known, and the listener name is available, return Some(node). If the leader is known, + * but the listener is unavailable, return Some(Node.NO_NODE). Otherwise, if the leader is not known, + * return None + */ + Optional<Node> getPartitionLeaderEndpoint(String topic, int partitionId, ListenerName listenerName); + + Map<Integer, Node> getPartitionReplicaEndpoints(TopicPartition tp, ListenerName listenerName); + + Cluster getClusterMetadata(String clusterId, ListenerName listenerName); Review Comment: ditto -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org