rajinisivaram commented on code in PR #19742: URL: https://github.com/apache/kafka/pull/19742#discussion_r2162513574
########## core/src/main/scala/kafka/server/ClientQuotaManager.scala: ########## @@ -155,9 +155,10 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, case None => new DefaultQuotaCallback } private val clientQuotaType = QuotaType.toClientQuotaType(quotaType) + private val activeQuotaEntities = new util.HashSet[KafkaQuotaEntity]() Review Comment: Do we really need to maintain this new set of quota entities? When custom quotas are used, I am not sure we can optimize based on active entities anyway. And for the default quota callback, we already have the map containing all entities, couldn't we use that instead? ########## core/src/main/scala/kafka/server/ClientQuotaManager.scala: ########## @@ -451,6 +450,41 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, } } + /** + * Helper method to update quotaTypesEnabled which is a bitwise OR combination of the enabled quota types. + * For example: + * - If UserQuotaEnabled = 2 and ClientIdQuotaEnabled = 1, then quotaTypesEnabled = 3 (2 | 1 = 3) + * - If UserClientIdQuotaEnabled = 4 and UserQuotaEnabled = 1, then quotaTypesEnabled = (4 | 1 = 5) + * - If UserClientIdQuotaEnabled = 4 and ClientIdQuotaEnabled = 2, then quotaTypesEnabled = 6 (4 | 2 = 6) + * - If all three are enabled (1 | 2 | 4), then quotaTypesEnabled = 7 + */ + private def updateQuotaTypes(): Unit = { + var updatedQuotaTypesEnabled = if (clientQuotaCallbackPlugin.isDefined) { + QuotaTypes.CustomQuotas + } else { + QuotaTypes.NoQuotas + } + + val activeQuotaTypes = scala.collection.mutable.Set[String]() + + activeQuotaEntities.forEach { + case KafkaQuotaEntity(Some(_), Some(_)) => + updatedQuotaTypesEnabled |= QuotaTypes.UserClientIdQuotaEnabled + activeQuotaTypes += "UserClientIdQuota" + case KafkaQuotaEntity(Some(_), None) => + updatedQuotaTypesEnabled |= QuotaTypes.UserQuotaEnabled + activeQuotaTypes += "UserQuota" + case KafkaQuotaEntity(None, Some(_)) => + updatedQuotaTypesEnabled |= QuotaTypes.ClientIdQuotaEnabled + activeQuotaTypes += "ClientIdQuota" + case _ => // Unexpected entity type + } + + quotaTypesEnabled = updatedQuotaTypesEnabled Review Comment: This flag is used to determine quota metric tags. In most production systems, I would expect quota types to be stable (i.e users and client ids may be added frequently, but switching from user-based quotas to client-id based quotas is less likely). In those systems, this PR does not add much value. It handles the less likely case where quota types are changing. In this case have we considered the impact of frequent changes to `quotaTypesEnabled` as entities are added or removed? ########## core/src/main/scala/kafka/server/ClientQuotaManager.scala: ########## @@ -451,6 +450,41 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, } } + /** + * Helper method to update quotaTypesEnabled which is a bitwise OR combination of the enabled quota types. + * For example: + * - If UserQuotaEnabled = 2 and ClientIdQuotaEnabled = 1, then quotaTypesEnabled = 3 (2 | 1 = 3) + * - If UserClientIdQuotaEnabled = 4 and UserQuotaEnabled = 1, then quotaTypesEnabled = (4 | 1 = 5) + * - If UserClientIdQuotaEnabled = 4 and ClientIdQuotaEnabled = 2, then quotaTypesEnabled = 6 (4 | 2 = 6) + * - If all three are enabled (1 | 2 | 4), then quotaTypesEnabled = 7 + */ + private def updateQuotaTypes(): Unit = { + var updatedQuotaTypesEnabled = if (clientQuotaCallbackPlugin.isDefined) { + QuotaTypes.CustomQuotas + } else { + QuotaTypes.NoQuotas + } + + val activeQuotaTypes = scala.collection.mutable.Set[String]() + + activeQuotaEntities.forEach { Review Comment: `activeQuotaEntities` is potentially large because it includes all entities for which quotas are set (every user for example). Here we just want to find the first matching entry for each type, it seems unnecessary to compare every element using `forEach`. Couldn't we just do some check based on the change that triggered this? We are holding the write lock here, so it will be good to avoid unnecessary processing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org