chia7712 commented on code in PR #18196: URL: https://github.com/apache/kafka/pull/18196#discussion_r1916690523
########## core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala: ########## @@ -337,6 +371,7 @@ object GroupedUserQuotaCallback { val DefaultProduceQuotaProp = "default.produce.quota" val DefaultFetchQuotaProp = "default.fetch.quota" val UnlimitedQuotaMetricTags = new util.HashMap[String, String] + val updateClusterMetadataCalls = new AtomicInteger Review Comment: why to add this count? ########## core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala: ########## @@ -136,9 +148,11 @@ class CustomQuotaCallbackTest extends IntegrationTestHarness with SaslSetup { // Create large number of partitions on another broker, should result in throttling on first partition val largeTopic = "group1_largeTopic" createTopic(largeTopic, numPartitions = 99, leader = 0) + user.removeThrottleMetrics() user.waitForQuotaUpdate(8000, 2500, defaultRequestQuota) user.produceConsume(expectProduceThrottle = true, expectConsumeThrottle = true) + user.removeQuotaOverrides() Review Comment: Could we call `removeQuotaOverrides` after creating new user for consistency? ########## core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala: ########## @@ -136,9 +148,11 @@ class CustomQuotaCallbackTest extends IntegrationTestHarness with SaslSetup { // Create large number of partitions on another broker, should result in throttling on first partition val largeTopic = "group1_largeTopic" createTopic(largeTopic, numPartitions = 99, leader = 0) + user.removeThrottleMetrics() Review Comment: this is unnecessary if we remove the unnecessary call of `updateClusterMetadata` ########## core/src/main/scala/kafka/server/metadata/DynamicClientQuotaPublisher.scala: ########## @@ -48,9 +51,21 @@ class DynamicClientQuotaPublisher( ): Unit = { val deltaName = s"MetadataDelta up to ${newImage.highestOffsetAndEpoch().offset}" try { - Option(delta.clientQuotasDelta()).foreach { clientQuotasDelta => - clientQuotaMetadataManager.update(clientQuotasDelta) + quotaManagers.clientQuotaCallback().ifPresent(clientQuotaCallback => { + if (delta.topicsDelta() != null || delta.clusterDelta() != null) { + val cluster = KRaftMetadataCache.toCluster(clusterId, newImage) + clientQuotaCallback.updateClusterMetadata(cluster) Review Comment: I don't think this line is required. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org