ahuang98 commented on code in PR #19742:
URL: https://github.com/apache/kafka/pull/19742#discussion_r2101129753


##########
core/src/main/scala/kafka/server/ClientQuotaManager.scala:
##########
@@ -428,18 +426,21 @@ class ClientQuotaManager(private val config: 
ClientQuotaManagerConfig,
     try {
       val quotaEntity = KafkaQuotaEntity(userEntity, clientEntity)
 
-      if (userEntity.nonEmpty) {
-        if (quotaEntity.clientIdEntity.nonEmpty)
-          quotaTypesEnabled |= QuotaTypes.UserClientIdQuotaEnabled
-        else
-          quotaTypesEnabled |= QuotaTypes.UserQuotaEnabled
-      } else if (clientEntity.nonEmpty)
-        quotaTypesEnabled |= QuotaTypes.ClientIdQuotaEnabled
-
+      val quotaTypes = (userEntity.nonEmpty, clientEntity.nonEmpty) match {
+        case (true, true) => QuotaTypes.UserClientIdQuotaEnabled
+        case (true, false) => QuotaTypes.UserQuotaEnabled
+        case (false, true) => QuotaTypes.ClientIdQuotaEnabled
+        case (false, false) => QuotaTypes.NoQuotas
+      }
       quota match {
-        case Some(newQuota) => quotaCallback.updateQuota(clientQuotaType, 
quotaEntity, newQuota.bound)
-        case None => quotaCallback.removeQuota(clientQuotaType, quotaEntity)
+        case Some(newQuota) =>
+          quotaCallback.updateQuota(clientQuotaType, quotaEntity, 
newQuota.bound)
+          updateQuotaTypes(quotaTypes, increment = true)
+        case None =>
+          quotaCallback.removeQuota(clientQuotaType, quotaEntity) // change 
here.

Review Comment:
   don't forget to remove the code comment



##########
core/src/main/scala/kafka/server/ClientQuotaManager.scala:
##########
@@ -428,18 +426,21 @@ class ClientQuotaManager(private val config: 
ClientQuotaManagerConfig,
     try {
       val quotaEntity = KafkaQuotaEntity(userEntity, clientEntity)
 
-      if (userEntity.nonEmpty) {
-        if (quotaEntity.clientIdEntity.nonEmpty)
-          quotaTypesEnabled |= QuotaTypes.UserClientIdQuotaEnabled
-        else
-          quotaTypesEnabled |= QuotaTypes.UserQuotaEnabled
-      } else if (clientEntity.nonEmpty)
-        quotaTypesEnabled |= QuotaTypes.ClientIdQuotaEnabled
-
+      val quotaTypes = (userEntity.nonEmpty, clientEntity.nonEmpty) match {
+        case (true, true) => QuotaTypes.UserClientIdQuotaEnabled
+        case (true, false) => QuotaTypes.UserQuotaEnabled
+        case (false, true) => QuotaTypes.ClientIdQuotaEnabled
+        case (false, false) => QuotaTypes.NoQuotas

Review Comment:
   just want to clarify my understanding of NoQuotas - if this method were 
called with a non-empty `quota` but empty `userEntity` and empty 
`clientEntity`, would that essentially be a no-op? assuming that combination is 
not expected/possible



##########
core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala:
##########
@@ -501,6 +501,76 @@ class ClientQuotaManagerTest extends 
BaseClientQuotaManagerTest {
     }
   }
 
+  @Test
+  def testQuotaTypesEnabledUpdatesOnAddAndRemove(): Unit = {
+    val clientQuotaManager = new ClientQuotaManager(config, metrics, 
QuotaType.CONTROLLER_MUTATION, time, "")
+    try {
+      // Initially, quotaTypesEnabled should be QuotaTypes.NoQuotas and 
quotasEnabled should be false
+      assertEquals(QuotaTypes.NoQuotas, 
clientQuotaManager.getQuotaTypesEnabled)
+      assertFalse(clientQuotaManager.quotasEnabled)
+
+      // Add a client-id quota and quotaTypesEnabled should be 
QuotaTypes.ClientIdQuotaEnabled
+      clientQuotaManager.updateQuota(
+        None,
+        Some(ClientQuotaManager.ClientIdEntity("client1")),
+        Some(new Quota(5, true))
+      )
+      assertEquals(QuotaTypes.ClientIdQuotaEnabled, 
clientQuotaManager.getQuotaTypesEnabled)
+      assertTrue(clientQuotaManager.quotasEnabled)
+

Review Comment:
   could we add another case here where we add another `ClientIdQuota` and show 
that `getQuotaTypesEnabled` still remains `QuotaTypes.ClientIdQuotaEnabled`?



##########
core/src/main/scala/kafka/server/ClientQuotaManager.scala:
##########
@@ -451,6 +452,43 @@ class ClientQuotaManager(private val config: 
ClientQuotaManagerConfig,
     }
   }
 
+  /**
+   * Helper method to update quota types counts and quotaTypesEnabled flag.
+   * @param quotaTypeKey The QuotaTypes constant (e.g., 
QuotaTypes.UserClientIdQuotaEnabled)
+   * @param increment True to increment count, false to decrement
+   */
+  private def updateQuotaTypes(quotaTypeKey: Int, increment: Boolean): Unit = {
+    if (quotaTypeKey == QuotaTypes.NoQuotas) {
+      return
+    }
+    val previousQuotaTypesEnabled = quotaTypesEnabled
+
+    // Update activeQuotaTypes counts
+    activeQuotaTypes.compute(quotaTypeKey, (_, count) =>
+      if (increment) Option(count).getOrElse(0) + 1
+      else if (Option(count).exists(_ > 1)) count - 1
+      else 0
+    )
+
+    // Update quotaTypesEnabled based on activeQuotaTypes counts
+    quotaTypesEnabled = clientQuotaCallbackPlugin match {
+      case Some(_) => QuotaTypes.CustomQuotas
+      case None =>
+        var newQuotaTypes = QuotaTypes.NoQuotas
+        if (activeQuotaTypes.getOrDefault(QuotaTypes.UserClientIdQuotaEnabled, 
0) > 0)
+          newQuotaTypes |= QuotaTypes.UserClientIdQuotaEnabled
+        if (activeQuotaTypes.getOrDefault(QuotaTypes.UserQuotaEnabled, 0) > 0)
+          newQuotaTypes |= QuotaTypes.UserQuotaEnabled
+        if (activeQuotaTypes.getOrDefault(QuotaTypes.ClientIdQuotaEnabled, 0) 
> 0)
+          newQuotaTypes |= QuotaTypes.ClientIdQuotaEnabled
+        newQuotaTypes
+    }
+
+    if (previousQuotaTypesEnabled != quotaTypesEnabled) {
+      debug(s"Quota types enabled changed from $previousQuotaTypesEnabled to 
$quotaTypesEnabled")

Review Comment:
   let's say we have positive counts for QuotaTypes.UserClientIdQuotaEnabled 
and QuotaTypes.UserQuotaEnabled, when we print the `quotaTypesEnabled` we would 
expect to see `6` printed? 



##########
core/src/main/scala/kafka/server/ClientQuotaManager.scala:
##########
@@ -451,6 +452,43 @@ class ClientQuotaManager(private val config: 
ClientQuotaManagerConfig,
     }
   }
 
+  /**
+   * Helper method to update quota types counts and quotaTypesEnabled flag.
+   * @param quotaTypeKey The QuotaTypes constant (e.g., 
QuotaTypes.UserClientIdQuotaEnabled)
+   * @param increment True to increment count, false to decrement
+   */
+  private def updateQuotaTypes(quotaTypeKey: Int, increment: Boolean): Unit = {
+    if (quotaTypeKey == QuotaTypes.NoQuotas) {
+      return
+    }
+    val previousQuotaTypesEnabled = quotaTypesEnabled
+
+    // Update activeQuotaTypes counts
+    activeQuotaTypes.compute(quotaTypeKey, (_, count) =>
+      if (increment) Option(count).getOrElse(0) + 1
+      else if (Option(count).exists(_ > 1)) count - 1
+      else 0
+    )
+
+    // Update quotaTypesEnabled based on activeQuotaTypes counts
+    quotaTypesEnabled = clientQuotaCallbackPlugin match {
+      case Some(_) => QuotaTypes.CustomQuotas
+      case None =>
+        var newQuotaTypes = QuotaTypes.NoQuotas
+        if (activeQuotaTypes.getOrDefault(QuotaTypes.UserClientIdQuotaEnabled, 
0) > 0)
+          newQuotaTypes |= QuotaTypes.UserClientIdQuotaEnabled

Review Comment:
   should we just make this `newQuotaTypes = 
QuotaTypes.UserClientIdQuotaEnabled`?



##########
core/src/main/scala/kafka/server/ClientQuotaManager.scala:
##########
@@ -451,6 +452,43 @@ class ClientQuotaManager(private val config: 
ClientQuotaManagerConfig,
     }
   }
 
+  /**
+   * Helper method to update quota types counts and quotaTypesEnabled flag.
+   * @param quotaTypeKey The QuotaTypes constant (e.g., 
QuotaTypes.UserClientIdQuotaEnabled)
+   * @param increment True to increment count, false to decrement
+   */
+  private def updateQuotaTypes(quotaTypeKey: Int, increment: Boolean): Unit = {
+    if (quotaTypeKey == QuotaTypes.NoQuotas) {
+      return
+    }
+    val previousQuotaTypesEnabled = quotaTypesEnabled
+
+    // Update activeQuotaTypes counts
+    activeQuotaTypes.compute(quotaTypeKey, (_, count) =>
+      if (increment) Option(count).getOrElse(0) + 1
+      else if (Option(count).exists(_ > 1)) count - 1
+      else 0
+    )
+
+    // Update quotaTypesEnabled based on activeQuotaTypes counts

Review Comment:
   maybe would be helpful if this comment explained the bitwise operation, 
potentially with an example. 
   
   e.g. quotaTypesEnabled is the bitwise representation of what quota types are 
enabled. For instance, if quotaTypesEnabled == 6, then we know we have 
UserClientIdQuotaEnabled and UserQuotaEnabled since their bit representations 
are 4 and 2. (4 | 2 = 6)



##########
core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala:
##########
@@ -501,6 +501,76 @@ class ClientQuotaManagerTest extends 
BaseClientQuotaManagerTest {
     }
   }
 
+  @Test
+  def testQuotaTypesEnabledUpdatesOnAddAndRemove(): Unit = {
+    val clientQuotaManager = new ClientQuotaManager(config, metrics, 
QuotaType.CONTROLLER_MUTATION, time, "")
+    try {
+      // Initially, quotaTypesEnabled should be QuotaTypes.NoQuotas and 
quotasEnabled should be false
+      assertEquals(QuotaTypes.NoQuotas, 
clientQuotaManager.getQuotaTypesEnabled)
+      assertFalse(clientQuotaManager.quotasEnabled)
+
+      // Add a client-id quota and quotaTypesEnabled should be 
QuotaTypes.ClientIdQuotaEnabled
+      clientQuotaManager.updateQuota(
+        None,
+        Some(ClientQuotaManager.ClientIdEntity("client1")),
+        Some(new Quota(5, true))
+      )
+      assertEquals(QuotaTypes.ClientIdQuotaEnabled, 
clientQuotaManager.getQuotaTypesEnabled)
+      assertTrue(clientQuotaManager.quotasEnabled)
+
+      // Add a user quota and quotaTypesEnabled should be 
QuotaTypes.UserQuotaEnabled | QuotaTypes.ClientIdQuotaEnabled
+      clientQuotaManager.updateQuota(
+        Some(ClientQuotaManager.UserEntity("userA")),
+        None,
+        Some(new Quota(5, true))
+      )
+      assertEquals(QuotaTypes.UserQuotaEnabled | 
QuotaTypes.ClientIdQuotaEnabled, clientQuotaManager.getQuotaTypesEnabled)
+      assertTrue(clientQuotaManager.quotasEnabled)
+
+      // Add a user-client-id quota and quotaTypesEnabled should
+      // be QuotaTypes.UserClientIdQuotaEnabled | 
QuotaTypes.ClientIdQuotaEnabled | QuotaTypes.UserQuotaEnabled
+      clientQuotaManager.updateQuota(
+        Some(ClientQuotaManager.UserEntity("userB")),
+        Some(ClientQuotaManager.ClientIdEntity("client2")),
+        Some(new Quota(10, true))
+      )
+      assertEquals(
+        QuotaTypes.UserClientIdQuotaEnabled | QuotaTypes.ClientIdQuotaEnabled 
| QuotaTypes.UserQuotaEnabled,
+        clientQuotaManager.getQuotaTypesEnabled
+      )
+      assertTrue(clientQuotaManager.quotasEnabled)
+
+      // Remove the user quota and quotaTypesEnabled should be 
QuotaTypes.UserClientIdQuotaEnabled | QuotaTypes.ClientIdQuotaEnabled

Review Comment:
   with the above suggestions, the remove cases become more interesting too - 
after removing just one `ClientIdQuota` we would expect that the 
quotaTypesEnabled does not change yet



##########
core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala:
##########
@@ -501,6 +501,76 @@ class ClientQuotaManagerTest extends 
BaseClientQuotaManagerTest {
     }
   }
 
+  @Test
+  def testQuotaTypesEnabledUpdatesOnAddAndRemove(): Unit = {
+    val clientQuotaManager = new ClientQuotaManager(config, metrics, 
QuotaType.CONTROLLER_MUTATION, time, "")
+    try {
+      // Initially, quotaTypesEnabled should be QuotaTypes.NoQuotas and 
quotasEnabled should be false
+      assertEquals(QuotaTypes.NoQuotas, 
clientQuotaManager.getQuotaTypesEnabled)
+      assertFalse(clientQuotaManager.quotasEnabled)
+
+      // Add a client-id quota and quotaTypesEnabled should be 
QuotaTypes.ClientIdQuotaEnabled
+      clientQuotaManager.updateQuota(
+        None,
+        Some(ClientQuotaManager.ClientIdEntity("client1")),
+        Some(new Quota(5, true))
+      )
+      assertEquals(QuotaTypes.ClientIdQuotaEnabled, 
clientQuotaManager.getQuotaTypesEnabled)
+      assertTrue(clientQuotaManager.quotasEnabled)
+
+      // Add a user quota and quotaTypesEnabled should be 
QuotaTypes.UserQuotaEnabled | QuotaTypes.ClientIdQuotaEnabled
+      clientQuotaManager.updateQuota(
+        Some(ClientQuotaManager.UserEntity("userA")),
+        None,
+        Some(new Quota(5, true))
+      )
+      assertEquals(QuotaTypes.UserQuotaEnabled | 
QuotaTypes.ClientIdQuotaEnabled, clientQuotaManager.getQuotaTypesEnabled)
+      assertTrue(clientQuotaManager.quotasEnabled)
+
+      // Add a user-client-id quota and quotaTypesEnabled should
+      // be QuotaTypes.UserClientIdQuotaEnabled | 
QuotaTypes.ClientIdQuotaEnabled | QuotaTypes.UserQuotaEnabled
+      clientQuotaManager.updateQuota(
+        Some(ClientQuotaManager.UserEntity("userB")),
+        Some(ClientQuotaManager.ClientIdEntity("client2")),
+        Some(new Quota(10, true))
+      )
+      assertEquals(
+        QuotaTypes.UserClientIdQuotaEnabled | QuotaTypes.ClientIdQuotaEnabled 
| QuotaTypes.UserQuotaEnabled,
+        clientQuotaManager.getQuotaTypesEnabled
+      )
+      assertTrue(clientQuotaManager.quotasEnabled)
+

Review Comment:
   could we add another case here where we add a duplicate quota type (e.g. 
UserClientIdQuota) and show that quotasEnabled still remains the same value?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to