dajac commented on code in PR #19493:
URL: https://github.com/apache/kafka/pull/19493#discussion_r2707059814


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -2927,16 +2927,60 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
-  def handleListClientMetricsResources(request: RequestChannel.Request): Unit 
= {
-    val listClientMetricsResourcesRequest = 
request.body[ListClientMetricsResourcesRequest]
+  /**
+   * Handle ListConfigResourcesRequest. If resourceTypes are not specified, it 
uses ListConfigResourcesRequest#supportedResourceTypes
+   * to retrieve config resources. If resourceTypes are specified, it returns 
matched config resources.
+   * If a config resource type is not supported, the handler returns 
UNSUPPORTED_VERSION.
+   */
+  private def handleListConfigResources(request: RequestChannel.Request): Unit 
= {
+    val listConfigResourcesRequest = request.body[ListConfigResourcesRequest]
 
     if (!authHelper.authorize(request.context, DESCRIBE_CONFIGS, CLUSTER, 
CLUSTER_NAME)) {
-      requestHelper.sendMaybeThrottle(request, 
listClientMetricsResourcesRequest.getErrorResponse(Errors.CLUSTER_AUTHORIZATION_FAILED.exception))
+      requestHelper.sendMaybeThrottle(request, 
listConfigResourcesRequest.getErrorResponse(Errors.CLUSTER_AUTHORIZATION_FAILED.exception))
     } else {
-      val data = new 
ListClientMetricsResourcesResponseData().setClientMetricsResources(
-        clientMetricsManager.listClientMetricsResources.stream.map(
-          name => new 
ClientMetricsResource().setName(name)).collect(Collectors.toList()))
-      requestHelper.sendMaybeThrottle(request, new 
ListClientMetricsResourcesResponse(data))
+      val data = new ListConfigResourcesResponseData()
+
+      val supportedResourceTypes = 
listConfigResourcesRequest.supportedResourceTypes()
+      var resourceTypes = listConfigResourcesRequest.data().resourceTypes()
+      if (resourceTypes.isEmpty) {
+        resourceTypes = supportedResourceTypes.stream().toList
+      }
+
+      resourceTypes.forEach(resourceType =>
+        if (!supportedResourceTypes.contains(resourceType)) {
+          requestHelper.sendMaybeThrottle(request, new 
ListConfigResourcesResponse(data.setErrorCode(Errors.UNSUPPORTED_VERSION.code())))
+          return
+        }
+      )
+
+      val result = new 
util.ArrayList[ListConfigResourcesResponseData.ConfigResource]()
+      if (resourceTypes.contains(ConfigResource.Type.GROUP.id)) {
+        groupConfigManager.groupIds().forEach(id =>
+          result.add(new 
ListConfigResourcesResponseData.ConfigResource().setResourceName(id).setResourceType(ConfigResource.Type.GROUP.id))
+        )
+      }
+      if (resourceTypes.contains(ConfigResource.Type.CLIENT_METRICS.id)) {
+        clientMetricsManager.listClientMetricsResources.forEach(name =>
+          result.add(new 
ListConfigResourcesResponseData.ConfigResource().setResourceName(name).setResourceType(ConfigResource.Type.CLIENT_METRICS.id))
+        )
+      }
+      if (resourceTypes.contains(ConfigResource.Type.BROKER_LOGGER.id)) {
+        
metadataCache.getBrokerNodes(request.context.listenerName).forEach(node =>
+          result.add(new 
ListConfigResourcesResponseData.ConfigResource().setResourceName(node.id.toString).setResourceType(ConfigResource.Type.BROKER_LOGGER.id))
+        )
+      }
+      if (resourceTypes.contains(ConfigResource.Type.BROKER.id)) {
+        
metadataCache.getBrokerNodes(request.context.listenerName).forEach(node =>
+          result.add(new 
ListConfigResourcesResponseData.ConfigResource().setResourceName(node.id.toString).setResourceType(ConfigResource.Type.BROKER.id))
+        )
+      }
+      if (resourceTypes.contains(ConfigResource.Type.TOPIC.id)) {
+        metadataCache.getAllTopics.forEach(name =>

Review Comment:
   @AndrewJSchofield @FrankYang0529 Do we return all topics on purpose here? It 
feels a bit weird because for groups we only returns the ones with dynamic 
configs, for client metrics too. The number of topics could be extremely large.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to