apoorvmittal10 commented on code in PR #14621: URL: https://github.com/apache/kafka/pull/14621#discussion_r1378834332
########## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ########## @@ -477,6 +479,128 @@ class KafkaApisTest { testKraftForwarding(ApiKeys.ELECT_LEADERS, requestBuilder) } + @Test + def testAlterConfigsClientMetrics(): Unit = { + val subscriptionName = "client_metric_subscription_1" + val authorizedResource = new ConfigResource(ConfigResource.Type.CLIENT_METRICS, subscriptionName) + + val authorizer: Authorizer = mock(classOf[Authorizer]) + authorizeResource(authorizer, AclOperation.ALTER_CONFIGS, ResourceType.CLUSTER, + Resource.CLUSTER_NAME, AuthorizationResult.ALLOWED) + + val props = ClientMetricsTestUtils.getDefaultProperties + val configEntries = new util.ArrayList[AlterConfigsRequest.ConfigEntry]() + props.forEach((x, y) => + configEntries.add(new AlterConfigsRequest.ConfigEntry(x.asInstanceOf[String], y.asInstanceOf[String]))) + + val configs = Map(authorizedResource -> new AlterConfigsRequest.Config(configEntries)) + + val requestHeader = new RequestHeader(ApiKeys.ALTER_CONFIGS, ApiKeys.ALTER_CONFIGS.latestVersion, clientId, 0) + val request = buildRequest( + new AlterConfigsRequest.Builder(configs.asJava, false).build(requestHeader.apiVersion)) + + when(controller.isActive).thenReturn(false) + when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](), + any[Long])).thenReturn(0) + when(adminManager.alterConfigs(any(), ArgumentMatchers.eq(false))) + .thenReturn(Map(authorizedResource -> ApiError.NONE)) + + createKafkaApis(authorizer = Some(authorizer)).handleAlterConfigsRequest(request) + val response = verifyNoThrottling[AlterConfigsResponse](request) + verifyAlterConfigResult(response, Map(subscriptionName -> Errors.NONE)) + verify(authorizer, times(1)).authorize(any(), any()) + verify(adminManager).alterConfigs(any(), anyBoolean()) + } + + @Test + def testIncrementalClientMetricAlterConfigs(): Unit = { + val authorizer: Authorizer = mock(classOf[Authorizer]) + + val subscriptionName = "client_metric_subscription_1" + val resource = new ConfigResource(ConfigResource.Type.CLIENT_METRICS, subscriptionName) + + authorizeResource(authorizer, AclOperation.ALTER_CONFIGS, ResourceType.CLUSTER, + Resource.CLUSTER_NAME, AuthorizationResult.ALLOWED) + + val requestHeader = new RequestHeader(ApiKeys.INCREMENTAL_ALTER_CONFIGS, + ApiKeys.INCREMENTAL_ALTER_CONFIGS.latestVersion, clientId, 0) + + val incrementalAlterConfigsRequest = getIncrementalClientMetricsAlterConfigRequestBuilder( + Seq(resource)).build(requestHeader.apiVersion) + val request = buildRequest(incrementalAlterConfigsRequest, + fromPrivilegedListener = true, requestHeader = Option(requestHeader)) + + when(controller.isActive).thenReturn(true) + when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](), + any[Long])).thenReturn(0) + when(adminManager.incrementalAlterConfigs(any(), ArgumentMatchers.eq(false))) + .thenReturn(Map(resource -> ApiError.NONE)) + + createKafkaApis(authorizer = Some(authorizer)).handleIncrementalAlterConfigsRequest(request) + val response = verifyNoThrottling[IncrementalAlterConfigsResponse](request) + verifyIncrementalAlterConfigResult(response, Map(subscriptionName -> Errors.NONE )) + verify(authorizer, times(1)).authorize(any(), any()) + verify(adminManager).incrementalAlterConfigs(any(), anyBoolean()) + } + + private def getIncrementalClientMetricsAlterConfigRequestBuilder(configResources: Seq[ConfigResource]): IncrementalAlterConfigsRequest.Builder = { + val resourceMap = configResources.map(configResource => { + val entryToBeModified = new ConfigEntry("metrics", "foo.bar") + configResource -> Set(new AlterConfigOp(entryToBeModified, OpType.SET)).asJavaCollection + }).toMap.asJava + new IncrementalAlterConfigsRequest.Builder(resourceMap, false) + } + + @Test + def testDescribeConfigsClientMetrics(): Unit = { + val authorizer: Authorizer = mock(classOf[Authorizer]) + val operation = AclOperation.DESCRIBE_CONFIGS + val resourceType = ResourceType.CLUSTER + val subscriptionName = "client_metric_subscription_1" + val requestHeader = + new RequestHeader(ApiKeys.DESCRIBE_CONFIGS, ApiKeys.DESCRIBE_CONFIGS.latestVersion, clientId, 0) + val expectedActions = Seq( + new Action(operation, new ResourcePattern(resourceType, Resource.CLUSTER_NAME, PatternType.LITERAL), + 1, true, true) + ) + // Verify that authorize is only called once Review Comment: Corrected. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org