apoorvmittal10 commented on code in PR #14621:
URL: https://github.com/apache/kafka/pull/14621#discussion_r1378834332


##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -477,6 +479,128 @@ class KafkaApisTest {
     testKraftForwarding(ApiKeys.ELECT_LEADERS, requestBuilder)
   }
 
+  @Test
+  def testAlterConfigsClientMetrics(): Unit = {
+    val subscriptionName = "client_metric_subscription_1"
+    val authorizedResource = new 
ConfigResource(ConfigResource.Type.CLIENT_METRICS, subscriptionName)
+
+    val authorizer: Authorizer = mock(classOf[Authorizer])
+    authorizeResource(authorizer, AclOperation.ALTER_CONFIGS, 
ResourceType.CLUSTER,
+      Resource.CLUSTER_NAME, AuthorizationResult.ALLOWED)
+
+    val props = ClientMetricsTestUtils.getDefaultProperties
+    val configEntries = new util.ArrayList[AlterConfigsRequest.ConfigEntry]()
+    props.forEach((x, y) =>
+      configEntries.add(new 
AlterConfigsRequest.ConfigEntry(x.asInstanceOf[String], 
y.asInstanceOf[String])))
+
+    val configs = Map(authorizedResource -> new 
AlterConfigsRequest.Config(configEntries))
+
+    val requestHeader = new RequestHeader(ApiKeys.ALTER_CONFIGS, 
ApiKeys.ALTER_CONFIGS.latestVersion, clientId, 0)
+    val request = buildRequest(
+      new AlterConfigsRequest.Builder(configs.asJava, 
false).build(requestHeader.apiVersion))
+
+    when(controller.isActive).thenReturn(false)
+    
when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
+      any[Long])).thenReturn(0)
+    when(adminManager.alterConfigs(any(), ArgumentMatchers.eq(false)))
+      .thenReturn(Map(authorizedResource -> ApiError.NONE))
+
+    createKafkaApis(authorizer = 
Some(authorizer)).handleAlterConfigsRequest(request)
+    val response = verifyNoThrottling[AlterConfigsResponse](request)
+    verifyAlterConfigResult(response, Map(subscriptionName -> Errors.NONE))
+    verify(authorizer, times(1)).authorize(any(), any())
+    verify(adminManager).alterConfigs(any(), anyBoolean())
+  }
+
+  @Test
+  def testIncrementalClientMetricAlterConfigs(): Unit = {
+    val authorizer: Authorizer = mock(classOf[Authorizer])
+
+    val subscriptionName = "client_metric_subscription_1"
+    val resource = new ConfigResource(ConfigResource.Type.CLIENT_METRICS, 
subscriptionName)
+
+    authorizeResource(authorizer, AclOperation.ALTER_CONFIGS, 
ResourceType.CLUSTER,
+      Resource.CLUSTER_NAME, AuthorizationResult.ALLOWED)
+
+    val requestHeader = new RequestHeader(ApiKeys.INCREMENTAL_ALTER_CONFIGS,
+      ApiKeys.INCREMENTAL_ALTER_CONFIGS.latestVersion, clientId, 0)
+
+    val incrementalAlterConfigsRequest = 
getIncrementalClientMetricsAlterConfigRequestBuilder(
+      Seq(resource)).build(requestHeader.apiVersion)
+    val request = buildRequest(incrementalAlterConfigsRequest,
+      fromPrivilegedListener = true, requestHeader = Option(requestHeader))
+
+    when(controller.isActive).thenReturn(true)
+    
when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
+      any[Long])).thenReturn(0)
+    when(adminManager.incrementalAlterConfigs(any(), 
ArgumentMatchers.eq(false)))
+      .thenReturn(Map(resource -> ApiError.NONE))
+
+    createKafkaApis(authorizer = 
Some(authorizer)).handleIncrementalAlterConfigsRequest(request)
+    val response = verifyNoThrottling[IncrementalAlterConfigsResponse](request)
+    verifyIncrementalAlterConfigResult(response, Map(subscriptionName -> 
Errors.NONE ))
+    verify(authorizer, times(1)).authorize(any(), any())
+    verify(adminManager).incrementalAlterConfigs(any(), anyBoolean())
+  }
+
+  private def 
getIncrementalClientMetricsAlterConfigRequestBuilder(configResources: 
Seq[ConfigResource]): IncrementalAlterConfigsRequest.Builder = {
+    val resourceMap = configResources.map(configResource => {
+      val entryToBeModified = new ConfigEntry("metrics", "foo.bar")
+      configResource -> Set(new AlterConfigOp(entryToBeModified, 
OpType.SET)).asJavaCollection
+    }).toMap.asJava
+    new IncrementalAlterConfigsRequest.Builder(resourceMap, false)
+  }
+
+  @Test
+  def testDescribeConfigsClientMetrics(): Unit = {
+    val authorizer: Authorizer = mock(classOf[Authorizer])
+    val operation = AclOperation.DESCRIBE_CONFIGS
+    val resourceType = ResourceType.CLUSTER
+    val subscriptionName = "client_metric_subscription_1"
+    val requestHeader =
+      new RequestHeader(ApiKeys.DESCRIBE_CONFIGS, 
ApiKeys.DESCRIBE_CONFIGS.latestVersion, clientId, 0)
+    val expectedActions = Seq(
+      new Action(operation, new ResourcePattern(resourceType, 
Resource.CLUSTER_NAME, PatternType.LITERAL),
+        1, true, true)
+    )
+    // Verify that authorize is only called once

Review Comment:
   Corrected.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to