rajinisivaram commented on code in PR #19742:
URL: https://github.com/apache/kafka/pull/19742#discussion_r2167158972


##########
clients/src/main/java/org/apache/kafka/server/quota/ClientQuotaCallback.java:
##########
@@ -106,6 +107,14 @@ public interface ClientQuotaCallback extends Configurable {
      */
     boolean updateClusterMetadata(Cluster cluster);
 
+    /**
+     * Return a set of active client quota entities, which represent the 
quotas currently in effect and applicable.
+     * If the callback does not track active client quotas, it may return an 
empty set.
+     *
+     * @return a set of active client quota entities
+     */
+    Set<ClientQuotaEntity> getActiveQuotasEntities();

Review Comment:
   This is public API, we cannot change it without a KIP. I don't think we need 
this in the interface anyway. 
   
   When custom callbacks are used, the code that relies on quotaTypesEnabled 
doesn't care about any of the types except QuotaTypes.CustomQuotas. So as long 
as that is set, the code path would be the same, regardless of the entities. So 
we only need to track quota entities for default callback, which we can do 
without modifying the interface. Hope I haven't missed the intention of 
including this in the interface.



##########
core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala:
##########
@@ -501,6 +505,156 @@ class ClientQuotaManagerTest extends 
BaseClientQuotaManagerTest {
     }
   }
 
+  @Test
+  def testQuotaTypesEnabledUpdatesWithDefaultCallback(): Unit = {
+    val clientQuotaManager = new ClientQuotaManager(config, metrics, 
QuotaType.CONTROLLER_MUTATION, time, "")
+    try {
+      // Initially, quotaTypesEnabled should be QuotaTypes.NoQuotas and 
quotasEnabled should be false
+      assertEquals(QuotaTypes.NoQuotas, clientQuotaManager.quotaTypesEnabled)
+      assertFalse(clientQuotaManager.quotasEnabled)
+
+      // Add a client-id quota, quotaTypesEnabled should be 
QuotaTypes.ClientIdQuotaEnabled
+      clientQuotaManager.updateQuota(None, 
Some(ClientQuotaManager.ClientIdEntity("client1")), Some(new Quota(5, true)))
+      assertEquals(QuotaTypes.ClientIdQuotaEnabled, 
clientQuotaManager.quotaTypesEnabled)
+      assertTrue(clientQuotaManager.quotasEnabled)
+
+      // Add a user quota, quotaTypesEnabled should be 
QuotaTypes.UserQuotaEnabled | QuotaTypes.ClientIdQuotaEnabled
+      
clientQuotaManager.updateQuota(Some(ClientQuotaManager.UserEntity("userA")), 
None, Some(new Quota(5, true)))
+      assertEquals(QuotaTypes.UserQuotaEnabled | 
QuotaTypes.ClientIdQuotaEnabled, clientQuotaManager.quotaTypesEnabled)
+      assertTrue(clientQuotaManager.quotasEnabled)
+
+      // Add a duplicate client-id quota, quotaTypesEnabled should remain 
unchanged
+      clientQuotaManager.updateQuota(None, 
Some(ClientQuotaManager.ClientIdEntity("client2")), Some(new Quota(5, true)))
+      assertEquals(QuotaTypes.UserQuotaEnabled | 
QuotaTypes.ClientIdQuotaEnabled, clientQuotaManager.quotaTypesEnabled)
+      assertTrue(clientQuotaManager.quotasEnabled)
+
+      // Add duplicate user quota, quotaTypesEnabled should remain unchanged
+      
clientQuotaManager.updateQuota(Some(ClientQuotaManager.UserEntity("userB")), 
None, Some(new Quota(5, true)))
+      assertEquals(QuotaTypes.UserQuotaEnabled | 
QuotaTypes.ClientIdQuotaEnabled, clientQuotaManager.quotaTypesEnabled)
+      assertTrue(clientQuotaManager.quotasEnabled)
+
+      // Add a user-client-id quota, quotaTypesEnabled should be 
QuotaTypes.UserClientIdQuotaEnabled | QuotaTypes.ClientIdQuotaEnabled | 
QuotaTypes.UserQuotaEnabled
+      
clientQuotaManager.updateQuota(Some(ClientQuotaManager.UserEntity("userA")), 
Some(ClientQuotaManager.ClientIdEntity("client1")), Some(new Quota(10, true)))
+      assertEquals(QuotaTypes.UserClientIdQuotaEnabled | 
QuotaTypes.ClientIdQuotaEnabled | QuotaTypes.UserQuotaEnabled, 
clientQuotaManager.quotaTypesEnabled)
+      assertTrue(clientQuotaManager.quotasEnabled)
+
+      // Add Duplicate user-client-id quota, quotaTypesEnabled should remain 
unchanged
+      
clientQuotaManager.updateQuota(Some(ClientQuotaManager.UserEntity("userA")), 
Some(ClientQuotaManager.ClientIdEntity("client1")), Some(new Quota(12, true)))
+      assertEquals(QuotaTypes.UserClientIdQuotaEnabled | 
QuotaTypes.ClientIdQuotaEnabled | QuotaTypes.UserQuotaEnabled, 
clientQuotaManager.quotaTypesEnabled)
+      assertTrue(clientQuotaManager.quotasEnabled)
+
+      // Remove the first user quota, quotaTypesEnabled should remain unchanged
+      
clientQuotaManager.updateQuota(Some(ClientQuotaManager.UserEntity("userA")), 
None, None)
+      assertEquals(QuotaTypes.UserClientIdQuotaEnabled | 
QuotaTypes.ClientIdQuotaEnabled | QuotaTypes.UserQuotaEnabled, 
clientQuotaManager.quotaTypesEnabled)
+      assertTrue(clientQuotaManager.quotasEnabled)
+
+      // Remove the second user quota, quotaTypesEnabled should be 
QuotaTypes.UserClientIdQuotaEnabled | QuotaTypes.ClientIdQuotaEnabled
+      
clientQuotaManager.updateQuota(Some(ClientQuotaManager.UserEntity("userB")), 
None, None)
+      assertEquals(QuotaTypes.UserClientIdQuotaEnabled | 
QuotaTypes.ClientIdQuotaEnabled, clientQuotaManager.quotaTypesEnabled)
+      assertTrue(clientQuotaManager.quotasEnabled)
+
+      // Remove the first client-id quota, quotaTypesEnabled should remain 
unchanged
+      clientQuotaManager.updateQuota(None, 
Some(ClientQuotaManager.ClientIdEntity("client1")), None)
+      assertEquals(QuotaTypes.UserClientIdQuotaEnabled | 
QuotaTypes.ClientIdQuotaEnabled, clientQuotaManager.quotaTypesEnabled)
+      assertTrue(clientQuotaManager.quotasEnabled)
+
+      // Remove the second client-id quota, quotaTypesEnabled should be 
QuotaTypes.UserClientIdQuotaEnabled
+      clientQuotaManager.updateQuota(None, 
Some(ClientQuotaManager.ClientIdEntity("client2")), None)
+      assertEquals(QuotaTypes.UserClientIdQuotaEnabled, 
clientQuotaManager.quotaTypesEnabled)
+      assertTrue(clientQuotaManager.quotasEnabled)
+
+      // Remove the first user-client-id quota, quotaTypesEnabled should be 
noQuotas as both user-client-id quotas has the same user client but different 
quota
+      
clientQuotaManager.updateQuota(Some(ClientQuotaManager.UserEntity("userA")), 
Some(ClientQuotaManager.ClientIdEntity("client1")), None)
+      assertEquals(QuotaTypes.NoQuotas, clientQuotaManager.quotaTypesEnabled)
+      assertFalse(clientQuotaManager.quotasEnabled)
+
+      // Remove the second user-client-id quota, quotaTypesEnabled should be 
QuotaTypes.NoQuotas and quotasEnabled should be false
+      
clientQuotaManager.updateQuota(Some(ClientQuotaManager.UserEntity("userA")), 
Some(ClientQuotaManager.ClientIdEntity("client1")), None)
+      assertEquals(QuotaTypes.NoQuotas, clientQuotaManager.quotaTypesEnabled)
+      assertFalse(clientQuotaManager.quotasEnabled)
+    } finally {
+      clientQuotaManager.shutdown()
+    }
+  }
+
+  @Test
+  def testQuotaTypesEnabledUpdatesWithCustomCallback(): Unit = {

Review Comment:
   We don't rely on quotaTypesEnabled with custom callbacks, so don't need this 
test. Or change it to just verify that QuotaTypes.CustomQuotas is set.



##########
core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala:
##########
@@ -479,6 +479,8 @@ class GroupedUserQuotaCallback extends ClientQuotaCallback 
with Reconfigurable w
 
   override def quotaResetRequired(quotaType: ClientQuotaType): Boolean = 
customQuotasUpdated(quotaType).getAndSet(false)
 
+  override def getActiveQuotasEntities: util.Set[ClientQuotaEntity] = 
Set.empty[ClientQuotaEntity].asJava

Review Comment:
   Not needed.



##########
core/src/main/scala/kafka/server/ClientQuotaManager.scala:
##########
@@ -647,6 +690,9 @@ class ClientQuotaManager(private val config: 
ClientQuotaManagerConfig,
       Map(DefaultTags.User -> userTag, DefaultTags.ClientId -> clientIdTag)
     }
 
+    override def getActiveQuotasEntities: util.Set[ClientQuotaEntity] = {
+      overriddenQuotas.keySet().asScala.toSet.asJava

Review Comment:
   Unnecessary conversions, just need `overriddenQuotas.keySet()`



##########
core/src/main/scala/kafka/server/ClientQuotaManager.scala:
##########
@@ -451,6 +449,51 @@ class ClientQuotaManager(private val config: 
ClientQuotaManagerConfig,
     }
   }
 
+  /**
+   * Updates `quotaTypesEnabled` by performing a bitwise OR operation to 
combine the enabled quota types.
+   * This method ensures that the `quotaTypesEnabled` field reflects the 
active quota types based on the
+   * current state of `activeQuotaEntities`.
+   * For example:
+   *  - If UserQuotaEnabled = 2 and ClientIdQuotaEnabled = 1, then 
quotaTypesEnabled = 3 (2 | 1 = 3)
+   *  - If UserClientIdQuotaEnabled = 4 and UserQuotaEnabled = 1, then 
quotaTypesEnabled =  (4 | 1 = 5)
+   *  - If UserClientIdQuotaEnabled = 4 and ClientIdQuotaEnabled = 2, then 
quotaTypesEnabled = 6 (4 | 2 = 6)
+   *  - If all three are enabled (1 | 2 | 4), then quotaTypesEnabled = 7
+   *
+   *  @param quotaEntity The entity for which the quota is being updated, 
which can be a combination of user and client-id.
+   *  @param shouldAdd   A boolean indicating whether to add or remove the 
quota entity.
+   */
+  private def updateQuotaTypes(quotaEntity: KafkaQuotaEntity, shouldAdd: 
Boolean): Unit = {
+
+    val activeQuotaType = quotaEntity match {
+      case KafkaQuotaEntity(Some(_), Some(_)) => 
QuotaTypes.UserClientIdQuotaEnabled
+      case KafkaQuotaEntity(Some(_), None) => QuotaTypes.UserQuotaEnabled
+      case KafkaQuotaEntity(None, Some(_)) => QuotaTypes.ClientIdQuotaEnabled
+      case _ => QuotaTypes.NoQuotas
+    }
+
+    val isActive = quotaCallback.getActiveQuotasEntities.contains(quotaEntity)

Review Comment:
   Since we cannot put `getActiveQuotasEntities` in the interface, we can 
instead do:
   ```
   val isActive = quotaCallback match {
         case callback: DefaultQuotaCallback => 
callback.getActiveQuotasEntities.contains(quotaEntity)
         case _ => true
       }
   ```



##########
clients/clients-integration-tests/src/test/java/org/apache/kafka/server/quota/CustomQuotaCallbackTest.java:
##########
@@ -154,6 +155,11 @@ public boolean updateClusterMetadata(Cluster cluster) {
             return true;
         }
 
+        @Override
+        public Set<ClientQuotaEntity> getActiveQuotasEntities() {
+            return Set.of();
+        }

Review Comment:
   We shouldn't rely on updating custom quota callbacks since users may have 
their own implementations.



##########
core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala:
##########
@@ -1759,6 +1759,8 @@ class DummyClientQuotaCallback extends 
ClientQuotaCallback with Reconfigurable {
 
   override def close(): Unit = {}
 
+  override def getActiveQuotasEntities: util.Set[quota.ClientQuotaEntity] = 
util.Collections.emptySet()

Review Comment:
   Not needed.



##########
core/src/main/scala/kafka/server/ClientQuotaManager.scala:
##########
@@ -451,6 +449,51 @@ class ClientQuotaManager(private val config: 
ClientQuotaManagerConfig,
     }
   }
 
+  /**
+   * Updates `quotaTypesEnabled` by performing a bitwise OR operation to 
combine the enabled quota types.
+   * This method ensures that the `quotaTypesEnabled` field reflects the 
active quota types based on the
+   * current state of `activeQuotaEntities`.
+   * For example:
+   *  - If UserQuotaEnabled = 2 and ClientIdQuotaEnabled = 1, then 
quotaTypesEnabled = 3 (2 | 1 = 3)
+   *  - If UserClientIdQuotaEnabled = 4 and UserQuotaEnabled = 1, then 
quotaTypesEnabled =  (4 | 1 = 5)
+   *  - If UserClientIdQuotaEnabled = 4 and ClientIdQuotaEnabled = 2, then 
quotaTypesEnabled = 6 (4 | 2 = 6)
+   *  - If all three are enabled (1 | 2 | 4), then quotaTypesEnabled = 7
+   *
+   *  @param quotaEntity The entity for which the quota is being updated, 
which can be a combination of user and client-id.
+   *  @param shouldAdd   A boolean indicating whether to add or remove the 
quota entity.
+   */
+  private def updateQuotaTypes(quotaEntity: KafkaQuotaEntity, shouldAdd: 
Boolean): Unit = {
+
+    val activeQuotaType = quotaEntity match {
+      case KafkaQuotaEntity(Some(_), Some(_)) => 
QuotaTypes.UserClientIdQuotaEnabled
+      case KafkaQuotaEntity(Some(_), None) => QuotaTypes.UserQuotaEnabled
+      case KafkaQuotaEntity(None, Some(_)) => QuotaTypes.ClientIdQuotaEnabled
+      case _ => QuotaTypes.NoQuotas
+    }
+
+    val isActive = quotaCallback.getActiveQuotasEntities.contains(quotaEntity)
+    if (shouldAdd && !isActive) {
+      activeQuotaEntities.compute(activeQuotaType, (_, currentValue) => if 
(currentValue == 0) 1 else currentValue + 1)
+      quotaTypesEnabled |= activeQuotaType
+    } else if (!shouldAdd && isActive) {
+      activeQuotaEntities.compute(activeQuotaType, (_, currentValue) => if 
(currentValue <= 1) 0 else currentValue - 1)
+      if (activeQuotaEntities.get(activeQuotaType) == 0) {
+        quotaTypesEnabled &= ~activeQuotaType
+      }
+    }
+
+    val quotaTypes = List(
+      QuotaTypes.UserClientIdQuotaEnabled -> "UserClientIdQuota",
+      QuotaTypes.ClientIdQuotaEnabled     -> "ClientIdQuota",
+      QuotaTypes.UserQuotaEnabled         -> "UserQuota"
+    )
+
+    val activeEntities = quotaTypes.collect {
+      case (k, name) if activeQuotaEntities.get(k) > 0 => name
+    }.mkString(", ")
+    info(s"Quota types enabled has been changed to $quotaTypesEnabled with 
active quota entities: [$activeEntities]")

Review Comment:
   We should probably only do `updateQuotaTypes()` only if custom quotas are 
not used. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to