CalvinConfluent commented on code in PR #14612:
URL: https://github.com/apache/kafka/pull/14612#discussion_r1450863295


##########
core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala:
##########
@@ -189,6 +255,86 @@ class KRaftMetadataCache(val brokerId: Int) extends 
MetadataCache with Logging w
     }
   }
 
+  /**
+   * Get the topic metadata for the given topics.
+   *
+   * The quota is used to limit the number of partitions to return. The 
NextTopicPartition field points to the first
+   * partition can't be returned due the limit.
+   * If a topic can't return any partition due to quota limit reached, this 
topic will not be included in the response.
+   *
+   * Note, the topics should be sorted in alphabetical order. The topics in 
the DescribeTopicPartitionsResponseData
+   * will also be sorted in alphabetical order.
+   *
+   * @param topics                        The set of topics and their 
corresponding first partition id to fetch.
+   * @param listenerName                  The listener name.
+   * @param firstTopicPartitionStartIndex The start partition index for the 
first topic
+   * @param maximumNumberOfPartitions     The max number of partitions to 
return.
+   */
+  def getTopicMetadataForDescribeTopicResponse(
+    topics: Seq[String],
+    listenerName: ListenerName,
+    firstTopicPartitionStartIndex: Int,
+    maximumNumberOfPartitions: Int
+  ): DescribeTopicPartitionsResponseData = {
+    val image = _currentImage
+    var remaining = maximumNumberOfPartitions
+    var startIndex = firstTopicPartitionStartIndex
+    val result = new DescribeTopicPartitionsResponseData()
+    topics.foreach { topicName =>
+      if (remaining > 0) {
+        val partitionResponse = 
getPartitionMetadataForDescribeTopicResponse(image, topicName, listenerName)
+        partitionResponse.map( partitions => {
+          val upperIndex = startIndex + remaining
+          val response = new DescribeTopicPartitionsResponseTopic()
+            .setErrorCode(Errors.NONE.code)
+            .setName(topicName)
+            
.setTopicId(Option(image.topics().getTopic(topicName).id()).getOrElse(Uuid.ZERO_UUID))
+            .setIsInternal(Topic.isInternal(topicName))
+            .setPartitions(partitions.filter(partition => {
+              partition.partitionIndex() >= startIndex && 
partition.partitionIndex() < upperIndex
+            }).asJava)
+          remaining -= response.partitions().size()
+          result.topics().add(response)
+
+          if (upperIndex < partitions.size) {
+            result.setNextCursor(new Cursor()
+              .setTopicName(topicName)
+              .setPartitionIndex(upperIndex)
+            )
+            remaining = -1
+          }
+        })
+
+        // start index only applies to the first topic. Reset it here.
+        startIndex = 0
+
+        if (!partitionResponse.isDefined) {
+          val error = try {
+            Topic.validate(topicName)
+            Errors.UNKNOWN_TOPIC_OR_PARTITION

Review Comment:
   Updated, if fetch all topics, we ignore the topics with errors here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to