AndrewJSchofield commented on code in PR #19493:
URL: https://github.com/apache/kafka/pull/19493#discussion_r2707490419
##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -2927,16 +2927,60 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}
- def handleListClientMetricsResources(request: RequestChannel.Request): Unit
= {
- val listClientMetricsResourcesRequest =
request.body[ListClientMetricsResourcesRequest]
+ /**
+ * Handle ListConfigResourcesRequest. If resourceTypes are not specified, it
uses ListConfigResourcesRequest#supportedResourceTypes
+ * to retrieve config resources. If resourceTypes are specified, it returns
matched config resources.
+ * If a config resource type is not supported, the handler returns
UNSUPPORTED_VERSION.
+ */
+ private def handleListConfigResources(request: RequestChannel.Request): Unit
= {
+ val listConfigResourcesRequest = request.body[ListConfigResourcesRequest]
if (!authHelper.authorize(request.context, DESCRIBE_CONFIGS, CLUSTER,
CLUSTER_NAME)) {
- requestHelper.sendMaybeThrottle(request,
listClientMetricsResourcesRequest.getErrorResponse(Errors.CLUSTER_AUTHORIZATION_FAILED.exception))
+ requestHelper.sendMaybeThrottle(request,
listConfigResourcesRequest.getErrorResponse(Errors.CLUSTER_AUTHORIZATION_FAILED.exception))
} else {
- val data = new
ListClientMetricsResourcesResponseData().setClientMetricsResources(
- clientMetricsManager.listClientMetricsResources.stream.map(
- name => new
ClientMetricsResource().setName(name)).collect(Collectors.toList()))
- requestHelper.sendMaybeThrottle(request, new
ListClientMetricsResourcesResponse(data))
+ val data = new ListConfigResourcesResponseData()
+
+ val supportedResourceTypes =
listConfigResourcesRequest.supportedResourceTypes()
+ var resourceTypes = listConfigResourcesRequest.data().resourceTypes()
+ if (resourceTypes.isEmpty) {
+ resourceTypes = supportedResourceTypes.stream().toList
+ }
+
+ resourceTypes.forEach(resourceType =>
+ if (!supportedResourceTypes.contains(resourceType)) {
+ requestHelper.sendMaybeThrottle(request, new
ListConfigResourcesResponse(data.setErrorCode(Errors.UNSUPPORTED_VERSION.code())))
+ return
+ }
+ )
+
+ val result = new
util.ArrayList[ListConfigResourcesResponseData.ConfigResource]()
+ if (resourceTypes.contains(ConfigResource.Type.GROUP.id)) {
+ groupConfigManager.groupIds().forEach(id =>
+ result.add(new
ListConfigResourcesResponseData.ConfigResource().setResourceName(id).setResourceType(ConfigResource.Type.GROUP.id))
+ )
+ }
+ if (resourceTypes.contains(ConfigResource.Type.CLIENT_METRICS.id)) {
+ clientMetricsManager.listClientMetricsResources.forEach(name =>
+ result.add(new
ListConfigResourcesResponseData.ConfigResource().setResourceName(name).setResourceType(ConfigResource.Type.CLIENT_METRICS.id))
+ )
+ }
+ if (resourceTypes.contains(ConfigResource.Type.BROKER_LOGGER.id)) {
+
metadataCache.getBrokerNodes(request.context.listenerName).forEach(node =>
+ result.add(new
ListConfigResourcesResponseData.ConfigResource().setResourceName(node.id.toString).setResourceType(ConfigResource.Type.BROKER_LOGGER.id))
+ )
+ }
+ if (resourceTypes.contains(ConfigResource.Type.BROKER.id)) {
+
metadataCache.getBrokerNodes(request.context.listenerName).forEach(node =>
+ result.add(new
ListConfigResourcesResponseData.ConfigResource().setResourceName(node.id.toString).setResourceType(ConfigResource.Type.BROKER.id))
+ )
+ }
+ if (resourceTypes.contains(ConfigResource.Type.TOPIC.id)) {
+ metadataCache.getAllTopics.forEach(name =>
Review Comment:
Yes, I think that we do. The idea is that this RPC returns the list of
resources for which the configs can be described. You are correct that this
list can be very long, and it would be probably be worthwhile to add pagination
as part of the effort to extend the admin API with pagination.
In Kafka, unfortunately, we have different behaviours for different types of
resources because of history. Groups are kind of pseudo-resources which come
into existence when the first member joins, but it is possible to define
configs on a group resource whether group exists or not. I have tried to
rationalize this area (KIP-1142, for example) but some differences remain.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]