jolshan commented on a change in pull request #9473:
URL: https://github.com/apache/kafka/pull/9473#discussion_r512068125



##########
File path: core/src/main/scala/kafka/zk/KafkaZkClient.scala
##########
@@ -577,6 +605,27 @@ class KafkaZkClient private[zk] (zooKeeperClient: 
ZooKeeperClient, isSecure: Boo
     retryRequestsUntilConnected(deleteRequests, 
expectedControllerEpochZkVersion)
   }
 
+  /**
+   * Gets the topic IDs for the given topics.
+   * @param topics the topics we wish to retrieve the Topic IDs for
+   * @return the Topic IDs
+   */
+  def getTopicIdsForTopics(topics: Set[String]): Map[String, UUID] = {
+    val getDataRequests = topics.map(topic => 
GetDataRequest(TopicZNode.path(topic), ctx = Some(topic)))
+    val getDataResponses = retryRequestsUntilConnected(getDataRequests.toSeq)
+    getDataResponses.map { getDataResponse =>
+      val topic = getDataResponse.ctx.get.asInstanceOf[String]
+      getDataResponse.resultCode match {
+        case Code.OK => Some(TopicZNode.decode(topic, getDataResponse.data))
+        case Code.NONODE => None
+        case _ => throw getDataResponse.resultException.get
+      }
+    }.filter(_.flatMap(_.topicId).isDefined)

Review comment:
       In the case where the topic was created on an older version (where there 
are no topic IDs yet, we will have the case where topic IDs are not defined. 
However, I believe in the case where this is used, we should have topic IDs 
defined. (I'm expecting a topic ID on the following line, so an error would 
occur there if it was missing.) I'm thinking it would make sense to remove the 
filter line and maybe throw an error here (earlier) if it is not set.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to