dajac commented on code in PR #19493:
URL: https://github.com/apache/kafka/pull/19493#discussion_r2707059814
##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -2927,16 +2927,60 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}
- def handleListClientMetricsResources(request: RequestChannel.Request): Unit
= {
- val listClientMetricsResourcesRequest =
request.body[ListClientMetricsResourcesRequest]
+ /**
+ * Handle ListConfigResourcesRequest. If resourceTypes are not specified, it
uses ListConfigResourcesRequest#supportedResourceTypes
+ * to retrieve config resources. If resourceTypes are specified, it returns
matched config resources.
+ * If a config resource type is not supported, the handler returns
UNSUPPORTED_VERSION.
+ */
+ private def handleListConfigResources(request: RequestChannel.Request): Unit
= {
+ val listConfigResourcesRequest = request.body[ListConfigResourcesRequest]
if (!authHelper.authorize(request.context, DESCRIBE_CONFIGS, CLUSTER,
CLUSTER_NAME)) {
- requestHelper.sendMaybeThrottle(request,
listClientMetricsResourcesRequest.getErrorResponse(Errors.CLUSTER_AUTHORIZATION_FAILED.exception))
+ requestHelper.sendMaybeThrottle(request,
listConfigResourcesRequest.getErrorResponse(Errors.CLUSTER_AUTHORIZATION_FAILED.exception))
} else {
- val data = new
ListClientMetricsResourcesResponseData().setClientMetricsResources(
- clientMetricsManager.listClientMetricsResources.stream.map(
- name => new
ClientMetricsResource().setName(name)).collect(Collectors.toList()))
- requestHelper.sendMaybeThrottle(request, new
ListClientMetricsResourcesResponse(data))
+ val data = new ListConfigResourcesResponseData()
+
+ val supportedResourceTypes =
listConfigResourcesRequest.supportedResourceTypes()
+ var resourceTypes = listConfigResourcesRequest.data().resourceTypes()
+ if (resourceTypes.isEmpty) {
+ resourceTypes = supportedResourceTypes.stream().toList
+ }
+
+ resourceTypes.forEach(resourceType =>
+ if (!supportedResourceTypes.contains(resourceType)) {
+ requestHelper.sendMaybeThrottle(request, new
ListConfigResourcesResponse(data.setErrorCode(Errors.UNSUPPORTED_VERSION.code())))
+ return
+ }
+ )
+
+ val result = new
util.ArrayList[ListConfigResourcesResponseData.ConfigResource]()
+ if (resourceTypes.contains(ConfigResource.Type.GROUP.id)) {
+ groupConfigManager.groupIds().forEach(id =>
+ result.add(new
ListConfigResourcesResponseData.ConfigResource().setResourceName(id).setResourceType(ConfigResource.Type.GROUP.id))
+ )
+ }
+ if (resourceTypes.contains(ConfigResource.Type.CLIENT_METRICS.id)) {
+ clientMetricsManager.listClientMetricsResources.forEach(name =>
+ result.add(new
ListConfigResourcesResponseData.ConfigResource().setResourceName(name).setResourceType(ConfigResource.Type.CLIENT_METRICS.id))
+ )
+ }
+ if (resourceTypes.contains(ConfigResource.Type.BROKER_LOGGER.id)) {
+
metadataCache.getBrokerNodes(request.context.listenerName).forEach(node =>
+ result.add(new
ListConfigResourcesResponseData.ConfigResource().setResourceName(node.id.toString).setResourceType(ConfigResource.Type.BROKER_LOGGER.id))
+ )
+ }
+ if (resourceTypes.contains(ConfigResource.Type.BROKER.id)) {
+
metadataCache.getBrokerNodes(request.context.listenerName).forEach(node =>
+ result.add(new
ListConfigResourcesResponseData.ConfigResource().setResourceName(node.id.toString).setResourceType(ConfigResource.Type.BROKER.id))
+ )
+ }
+ if (resourceTypes.contains(ConfigResource.Type.TOPIC.id)) {
+ metadataCache.getAllTopics.forEach(name =>
Review Comment:
@AndrewJSchofield @FrankYang0529 Do we return all topics on purpose here? It
feels a bit weird because for groups we only returns the ones with dynamic
configs, for client metrics too. The number of topics could be extremely large.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]