mumrah commented on code in PR #14612:
URL: https://github.com/apache/kafka/pull/14612#discussion_r1446495993
##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -1409,6 +1426,77 @@ class KafkaApis(val requestChannel: RequestChannel,
))
}
+ def handleDescribeTopicPartitionsRequest(request: RequestChannel.Request):
Unit = {
Review Comment:
Can you move this code into a new class? KafkaApis is already much too large.
##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -1409,6 +1426,77 @@ class KafkaApis(val requestChannel: RequestChannel,
))
}
+ def handleDescribeTopicPartitionsRequest(request: RequestChannel.Request):
Unit = {
+ metadataCache match {
+ case _: ZkMetadataCache =>
+ throw new InvalidRequestException("ZK cluster does not handle
DescribeTopicPartitions request")
+ case _ =>
+ }
+ val KRaftMetadataCache = metadataCache.asInstanceOf[KRaftMetadataCache]
+
+ val describeTopicPartitionsRequest =
request.body[DescribeTopicPartitionsRequest].data()
+ var topics = scala.collection.mutable.Set[String]()
+ describeTopicPartitionsRequest.topics().forEach(topic =>
topics.add(topic.name()))
+
+ val cursor = describeTopicPartitionsRequest.cursor()
+ val fetchAllTopics = topics.isEmpty
+ if (fetchAllTopics) {
+ metadataCache.getAllTopics().foreach(topic => topics.add(topic))
Review Comment:
If we're paginating through all topics and have a cursor, we can avoid
gather only the desired topics during this O(n) loop through all topics. That
would let us avoid another O(n) operation below to filter the undesired topics
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]