AndrewJSchofield commented on code in PR #19493:
URL: https://github.com/apache/kafka/pull/19493#discussion_r2707490419


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -2927,16 +2927,60 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
-  def handleListClientMetricsResources(request: RequestChannel.Request): Unit 
= {
-    val listClientMetricsResourcesRequest = 
request.body[ListClientMetricsResourcesRequest]
+  /**
+   * Handle ListConfigResourcesRequest. If resourceTypes are not specified, it 
uses ListConfigResourcesRequest#supportedResourceTypes
+   * to retrieve config resources. If resourceTypes are specified, it returns 
matched config resources.
+   * If a config resource type is not supported, the handler returns 
UNSUPPORTED_VERSION.
+   */
+  private def handleListConfigResources(request: RequestChannel.Request): Unit 
= {
+    val listConfigResourcesRequest = request.body[ListConfigResourcesRequest]
 
     if (!authHelper.authorize(request.context, DESCRIBE_CONFIGS, CLUSTER, 
CLUSTER_NAME)) {
-      requestHelper.sendMaybeThrottle(request, 
listClientMetricsResourcesRequest.getErrorResponse(Errors.CLUSTER_AUTHORIZATION_FAILED.exception))
+      requestHelper.sendMaybeThrottle(request, 
listConfigResourcesRequest.getErrorResponse(Errors.CLUSTER_AUTHORIZATION_FAILED.exception))
     } else {
-      val data = new 
ListClientMetricsResourcesResponseData().setClientMetricsResources(
-        clientMetricsManager.listClientMetricsResources.stream.map(
-          name => new 
ClientMetricsResource().setName(name)).collect(Collectors.toList()))
-      requestHelper.sendMaybeThrottle(request, new 
ListClientMetricsResourcesResponse(data))
+      val data = new ListConfigResourcesResponseData()
+
+      val supportedResourceTypes = 
listConfigResourcesRequest.supportedResourceTypes()
+      var resourceTypes = listConfigResourcesRequest.data().resourceTypes()
+      if (resourceTypes.isEmpty) {
+        resourceTypes = supportedResourceTypes.stream().toList
+      }
+
+      resourceTypes.forEach(resourceType =>
+        if (!supportedResourceTypes.contains(resourceType)) {
+          requestHelper.sendMaybeThrottle(request, new 
ListConfigResourcesResponse(data.setErrorCode(Errors.UNSUPPORTED_VERSION.code())))
+          return
+        }
+      )
+
+      val result = new 
util.ArrayList[ListConfigResourcesResponseData.ConfigResource]()
+      if (resourceTypes.contains(ConfigResource.Type.GROUP.id)) {
+        groupConfigManager.groupIds().forEach(id =>
+          result.add(new 
ListConfigResourcesResponseData.ConfigResource().setResourceName(id).setResourceType(ConfigResource.Type.GROUP.id))
+        )
+      }
+      if (resourceTypes.contains(ConfigResource.Type.CLIENT_METRICS.id)) {
+        clientMetricsManager.listClientMetricsResources.forEach(name =>
+          result.add(new 
ListConfigResourcesResponseData.ConfigResource().setResourceName(name).setResourceType(ConfigResource.Type.CLIENT_METRICS.id))
+        )
+      }
+      if (resourceTypes.contains(ConfigResource.Type.BROKER_LOGGER.id)) {
+        
metadataCache.getBrokerNodes(request.context.listenerName).forEach(node =>
+          result.add(new 
ListConfigResourcesResponseData.ConfigResource().setResourceName(node.id.toString).setResourceType(ConfigResource.Type.BROKER_LOGGER.id))
+        )
+      }
+      if (resourceTypes.contains(ConfigResource.Type.BROKER.id)) {
+        
metadataCache.getBrokerNodes(request.context.listenerName).forEach(node =>
+          result.add(new 
ListConfigResourcesResponseData.ConfigResource().setResourceName(node.id.toString).setResourceType(ConfigResource.Type.BROKER.id))
+        )
+      }
+      if (resourceTypes.contains(ConfigResource.Type.TOPIC.id)) {
+        metadataCache.getAllTopics.forEach(name =>

Review Comment:
   Yes, I think that we do. The idea is that this RPC returns the list of 
resources for which the configs can be described. You are correct that this 
list can be very long, and it would be probably be worthwhile to add pagination 
as part of the effort to extend the admin API with pagination.
   
   In Kafka, unfortunately, we have different behaviours for different types of 
resources because of history. Groups are kind of pseudo-resources which come 
into existence when the first member joins, but it is possible to define 
configs on a group resource whether group exists or not. I have tried to 
rationalize this area (KIP-1142, for example) but some differences remain.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to