chia7712 commented on code in PR #18196:
URL: https://github.com/apache/kafka/pull/18196#discussion_r1916690523


##########
core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala:
##########
@@ -337,6 +371,7 @@ object GroupedUserQuotaCallback {
   val DefaultProduceQuotaProp = "default.produce.quota"
   val DefaultFetchQuotaProp = "default.fetch.quota"
   val UnlimitedQuotaMetricTags = new util.HashMap[String, String]
+  val updateClusterMetadataCalls = new AtomicInteger

Review Comment:
   why to add this count?



##########
core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala:
##########
@@ -136,9 +148,11 @@ class CustomQuotaCallbackTest extends 
IntegrationTestHarness with SaslSetup {
     // Create large number of partitions on another broker, should result in 
throttling on first partition
     val largeTopic = "group1_largeTopic"
     createTopic(largeTopic, numPartitions = 99, leader = 0)
+    user.removeThrottleMetrics()
     user.waitForQuotaUpdate(8000, 2500, defaultRequestQuota)
     user.produceConsume(expectProduceThrottle = true, expectConsumeThrottle = 
true)
 
+    user.removeQuotaOverrides()

Review Comment:
   Could we call `removeQuotaOverrides` after creating new user for consistency?



##########
core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala:
##########
@@ -136,9 +148,11 @@ class CustomQuotaCallbackTest extends 
IntegrationTestHarness with SaslSetup {
     // Create large number of partitions on another broker, should result in 
throttling on first partition
     val largeTopic = "group1_largeTopic"
     createTopic(largeTopic, numPartitions = 99, leader = 0)
+    user.removeThrottleMetrics()

Review Comment:
   this is unnecessary if we remove the unnecessary call of 
`updateClusterMetadata`



##########
core/src/main/scala/kafka/server/metadata/DynamicClientQuotaPublisher.scala:
##########
@@ -48,9 +51,21 @@ class DynamicClientQuotaPublisher(
   ): Unit = {
     val deltaName = s"MetadataDelta up to 
${newImage.highestOffsetAndEpoch().offset}"
     try {
-        Option(delta.clientQuotasDelta()).foreach { clientQuotasDelta =>
-          clientQuotaMetadataManager.update(clientQuotasDelta)
+      quotaManagers.clientQuotaCallback().ifPresent(clientQuotaCallback => {
+        if (delta.topicsDelta() != null || delta.clusterDelta() != null) {
+          val cluster = KRaftMetadataCache.toCluster(clusterId, newImage)
+          clientQuotaCallback.updateClusterMetadata(cluster)

Review Comment:
   I don't think this line is required.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to