rajinisivaram commented on code in PR #19742:
URL: https://github.com/apache/kafka/pull/19742#discussion_r2162513574


##########
core/src/main/scala/kafka/server/ClientQuotaManager.scala:
##########
@@ -155,9 +155,10 @@ class ClientQuotaManager(private val config: 
ClientQuotaManagerConfig,
     case None => new DefaultQuotaCallback
   }
   private val clientQuotaType = QuotaType.toClientQuotaType(quotaType)
+  private val activeQuotaEntities = new util.HashSet[KafkaQuotaEntity]()

Review Comment:
   Do we really need to maintain this new set of quota entities? When custom 
quotas are used, I am not sure we can optimize based on active entities anyway. 
And for the default quota callback, we already have the map containing all 
entities, couldn't we use that instead?



##########
core/src/main/scala/kafka/server/ClientQuotaManager.scala:
##########
@@ -451,6 +450,41 @@ class ClientQuotaManager(private val config: 
ClientQuotaManagerConfig,
     }
   }
 
+  /**
+   * Helper method to update quotaTypesEnabled which is a bitwise OR 
combination of the enabled quota types.
+   * For example:
+   *  - If UserQuotaEnabled = 2 and ClientIdQuotaEnabled = 1, then 
quotaTypesEnabled = 3 (2 | 1 = 3)
+   *  - If UserClientIdQuotaEnabled = 4 and UserQuotaEnabled = 1, then 
quotaTypesEnabled =  (4 | 1 = 5)
+   *  - If UserClientIdQuotaEnabled = 4 and ClientIdQuotaEnabled = 2, then 
quotaTypesEnabled = 6 (4 | 2 = 6)
+   *  - If all three are enabled (1 | 2 | 4), then quotaTypesEnabled = 7
+   */
+  private def updateQuotaTypes(): Unit = {
+    var updatedQuotaTypesEnabled = if (clientQuotaCallbackPlugin.isDefined) {
+        QuotaTypes.CustomQuotas
+      } else {
+        QuotaTypes.NoQuotas
+      }
+
+    val activeQuotaTypes = scala.collection.mutable.Set[String]()
+
+    activeQuotaEntities.forEach {
+        case KafkaQuotaEntity(Some(_), Some(_)) =>
+          updatedQuotaTypesEnabled |= QuotaTypes.UserClientIdQuotaEnabled
+          activeQuotaTypes += "UserClientIdQuota"
+        case KafkaQuotaEntity(Some(_), None) =>
+          updatedQuotaTypesEnabled |= QuotaTypes.UserQuotaEnabled
+          activeQuotaTypes += "UserQuota"
+        case KafkaQuotaEntity(None, Some(_)) =>
+          updatedQuotaTypesEnabled |= QuotaTypes.ClientIdQuotaEnabled
+          activeQuotaTypes += "ClientIdQuota"
+        case _ => // Unexpected entity type
+    }
+
+    quotaTypesEnabled = updatedQuotaTypesEnabled

Review Comment:
   This flag is used to determine quota metric tags. In most production 
systems, I would expect quota types to be stable (i.e users and client ids may 
be added frequently, but switching from user-based quotas to client-id based 
quotas is less likely). In those systems, this PR does not add much value. It 
handles the less likely case where quota types are changing. In this case have 
we considered the impact of frequent changes to `quotaTypesEnabled` as entities 
are added or removed?



##########
core/src/main/scala/kafka/server/ClientQuotaManager.scala:
##########
@@ -451,6 +450,41 @@ class ClientQuotaManager(private val config: 
ClientQuotaManagerConfig,
     }
   }
 
+  /**
+   * Helper method to update quotaTypesEnabled which is a bitwise OR 
combination of the enabled quota types.
+   * For example:
+   *  - If UserQuotaEnabled = 2 and ClientIdQuotaEnabled = 1, then 
quotaTypesEnabled = 3 (2 | 1 = 3)
+   *  - If UserClientIdQuotaEnabled = 4 and UserQuotaEnabled = 1, then 
quotaTypesEnabled =  (4 | 1 = 5)
+   *  - If UserClientIdQuotaEnabled = 4 and ClientIdQuotaEnabled = 2, then 
quotaTypesEnabled = 6 (4 | 2 = 6)
+   *  - If all three are enabled (1 | 2 | 4), then quotaTypesEnabled = 7
+   */
+  private def updateQuotaTypes(): Unit = {
+    var updatedQuotaTypesEnabled = if (clientQuotaCallbackPlugin.isDefined) {
+        QuotaTypes.CustomQuotas
+      } else {
+        QuotaTypes.NoQuotas
+      }
+
+    val activeQuotaTypes = scala.collection.mutable.Set[String]()
+
+    activeQuotaEntities.forEach {

Review Comment:
   `activeQuotaEntities` is potentially large because it includes all entities 
for which quotas are set (every user for example). Here we just want to find 
the first matching entry for each type, it seems unnecessary to compare every 
element using `forEach`. Couldn't we just do some check based on the change 
that triggered this? We are holding the write lock here, so it will be good to 
avoid unnecessary processing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to