rajinisivaram commented on code in PR #19742: URL: https://github.com/apache/kafka/pull/19742#discussion_r2167158972
########## clients/src/main/java/org/apache/kafka/server/quota/ClientQuotaCallback.java: ########## @@ -106,6 +107,14 @@ public interface ClientQuotaCallback extends Configurable { */ boolean updateClusterMetadata(Cluster cluster); + /** + * Return a set of active client quota entities, which represent the quotas currently in effect and applicable. + * If the callback does not track active client quotas, it may return an empty set. + * + * @return a set of active client quota entities + */ + Set<ClientQuotaEntity> getActiveQuotasEntities(); Review Comment: This is public API, we cannot change it without a KIP. I don't think we need this in the interface anyway. When custom callbacks are used, the code that relies on quotaTypesEnabled doesn't care about any of the types except QuotaTypes.CustomQuotas. So as long as that is set, the code path would be the same, regardless of the entities. So we only need to track quota entities for default callback, which we can do without modifying the interface. Hope I haven't missed the intention of including this in the interface. ########## core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala: ########## @@ -501,6 +505,156 @@ class ClientQuotaManagerTest extends BaseClientQuotaManagerTest { } } + @Test + def testQuotaTypesEnabledUpdatesWithDefaultCallback(): Unit = { + val clientQuotaManager = new ClientQuotaManager(config, metrics, QuotaType.CONTROLLER_MUTATION, time, "") + try { + // Initially, quotaTypesEnabled should be QuotaTypes.NoQuotas and quotasEnabled should be false + assertEquals(QuotaTypes.NoQuotas, clientQuotaManager.quotaTypesEnabled) + assertFalse(clientQuotaManager.quotasEnabled) + + // Add a client-id quota, quotaTypesEnabled should be QuotaTypes.ClientIdQuotaEnabled + clientQuotaManager.updateQuota(None, Some(ClientQuotaManager.ClientIdEntity("client1")), Some(new Quota(5, true))) + assertEquals(QuotaTypes.ClientIdQuotaEnabled, clientQuotaManager.quotaTypesEnabled) + assertTrue(clientQuotaManager.quotasEnabled) + + // Add a user quota, quotaTypesEnabled should be QuotaTypes.UserQuotaEnabled | QuotaTypes.ClientIdQuotaEnabled + clientQuotaManager.updateQuota(Some(ClientQuotaManager.UserEntity("userA")), None, Some(new Quota(5, true))) + assertEquals(QuotaTypes.UserQuotaEnabled | QuotaTypes.ClientIdQuotaEnabled, clientQuotaManager.quotaTypesEnabled) + assertTrue(clientQuotaManager.quotasEnabled) + + // Add a duplicate client-id quota, quotaTypesEnabled should remain unchanged + clientQuotaManager.updateQuota(None, Some(ClientQuotaManager.ClientIdEntity("client2")), Some(new Quota(5, true))) + assertEquals(QuotaTypes.UserQuotaEnabled | QuotaTypes.ClientIdQuotaEnabled, clientQuotaManager.quotaTypesEnabled) + assertTrue(clientQuotaManager.quotasEnabled) + + // Add duplicate user quota, quotaTypesEnabled should remain unchanged + clientQuotaManager.updateQuota(Some(ClientQuotaManager.UserEntity("userB")), None, Some(new Quota(5, true))) + assertEquals(QuotaTypes.UserQuotaEnabled | QuotaTypes.ClientIdQuotaEnabled, clientQuotaManager.quotaTypesEnabled) + assertTrue(clientQuotaManager.quotasEnabled) + + // Add a user-client-id quota, quotaTypesEnabled should be QuotaTypes.UserClientIdQuotaEnabled | QuotaTypes.ClientIdQuotaEnabled | QuotaTypes.UserQuotaEnabled + clientQuotaManager.updateQuota(Some(ClientQuotaManager.UserEntity("userA")), Some(ClientQuotaManager.ClientIdEntity("client1")), Some(new Quota(10, true))) + assertEquals(QuotaTypes.UserClientIdQuotaEnabled | QuotaTypes.ClientIdQuotaEnabled | QuotaTypes.UserQuotaEnabled, clientQuotaManager.quotaTypesEnabled) + assertTrue(clientQuotaManager.quotasEnabled) + + // Add Duplicate user-client-id quota, quotaTypesEnabled should remain unchanged + clientQuotaManager.updateQuota(Some(ClientQuotaManager.UserEntity("userA")), Some(ClientQuotaManager.ClientIdEntity("client1")), Some(new Quota(12, true))) + assertEquals(QuotaTypes.UserClientIdQuotaEnabled | QuotaTypes.ClientIdQuotaEnabled | QuotaTypes.UserQuotaEnabled, clientQuotaManager.quotaTypesEnabled) + assertTrue(clientQuotaManager.quotasEnabled) + + // Remove the first user quota, quotaTypesEnabled should remain unchanged + clientQuotaManager.updateQuota(Some(ClientQuotaManager.UserEntity("userA")), None, None) + assertEquals(QuotaTypes.UserClientIdQuotaEnabled | QuotaTypes.ClientIdQuotaEnabled | QuotaTypes.UserQuotaEnabled, clientQuotaManager.quotaTypesEnabled) + assertTrue(clientQuotaManager.quotasEnabled) + + // Remove the second user quota, quotaTypesEnabled should be QuotaTypes.UserClientIdQuotaEnabled | QuotaTypes.ClientIdQuotaEnabled + clientQuotaManager.updateQuota(Some(ClientQuotaManager.UserEntity("userB")), None, None) + assertEquals(QuotaTypes.UserClientIdQuotaEnabled | QuotaTypes.ClientIdQuotaEnabled, clientQuotaManager.quotaTypesEnabled) + assertTrue(clientQuotaManager.quotasEnabled) + + // Remove the first client-id quota, quotaTypesEnabled should remain unchanged + clientQuotaManager.updateQuota(None, Some(ClientQuotaManager.ClientIdEntity("client1")), None) + assertEquals(QuotaTypes.UserClientIdQuotaEnabled | QuotaTypes.ClientIdQuotaEnabled, clientQuotaManager.quotaTypesEnabled) + assertTrue(clientQuotaManager.quotasEnabled) + + // Remove the second client-id quota, quotaTypesEnabled should be QuotaTypes.UserClientIdQuotaEnabled + clientQuotaManager.updateQuota(None, Some(ClientQuotaManager.ClientIdEntity("client2")), None) + assertEquals(QuotaTypes.UserClientIdQuotaEnabled, clientQuotaManager.quotaTypesEnabled) + assertTrue(clientQuotaManager.quotasEnabled) + + // Remove the first user-client-id quota, quotaTypesEnabled should be noQuotas as both user-client-id quotas has the same user client but different quota + clientQuotaManager.updateQuota(Some(ClientQuotaManager.UserEntity("userA")), Some(ClientQuotaManager.ClientIdEntity("client1")), None) + assertEquals(QuotaTypes.NoQuotas, clientQuotaManager.quotaTypesEnabled) + assertFalse(clientQuotaManager.quotasEnabled) + + // Remove the second user-client-id quota, quotaTypesEnabled should be QuotaTypes.NoQuotas and quotasEnabled should be false + clientQuotaManager.updateQuota(Some(ClientQuotaManager.UserEntity("userA")), Some(ClientQuotaManager.ClientIdEntity("client1")), None) + assertEquals(QuotaTypes.NoQuotas, clientQuotaManager.quotaTypesEnabled) + assertFalse(clientQuotaManager.quotasEnabled) + } finally { + clientQuotaManager.shutdown() + } + } + + @Test + def testQuotaTypesEnabledUpdatesWithCustomCallback(): Unit = { Review Comment: We don't rely on quotaTypesEnabled with custom callbacks, so don't need this test. Or change it to just verify that QuotaTypes.CustomQuotas is set. ########## core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala: ########## @@ -479,6 +479,8 @@ class GroupedUserQuotaCallback extends ClientQuotaCallback with Reconfigurable w override def quotaResetRequired(quotaType: ClientQuotaType): Boolean = customQuotasUpdated(quotaType).getAndSet(false) + override def getActiveQuotasEntities: util.Set[ClientQuotaEntity] = Set.empty[ClientQuotaEntity].asJava Review Comment: Not needed. ########## core/src/main/scala/kafka/server/ClientQuotaManager.scala: ########## @@ -647,6 +690,9 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, Map(DefaultTags.User -> userTag, DefaultTags.ClientId -> clientIdTag) } + override def getActiveQuotasEntities: util.Set[ClientQuotaEntity] = { + overriddenQuotas.keySet().asScala.toSet.asJava Review Comment: Unnecessary conversions, just need `overriddenQuotas.keySet()` ########## core/src/main/scala/kafka/server/ClientQuotaManager.scala: ########## @@ -451,6 +449,51 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, } } + /** + * Updates `quotaTypesEnabled` by performing a bitwise OR operation to combine the enabled quota types. + * This method ensures that the `quotaTypesEnabled` field reflects the active quota types based on the + * current state of `activeQuotaEntities`. + * For example: + * - If UserQuotaEnabled = 2 and ClientIdQuotaEnabled = 1, then quotaTypesEnabled = 3 (2 | 1 = 3) + * - If UserClientIdQuotaEnabled = 4 and UserQuotaEnabled = 1, then quotaTypesEnabled = (4 | 1 = 5) + * - If UserClientIdQuotaEnabled = 4 and ClientIdQuotaEnabled = 2, then quotaTypesEnabled = 6 (4 | 2 = 6) + * - If all three are enabled (1 | 2 | 4), then quotaTypesEnabled = 7 + * + * @param quotaEntity The entity for which the quota is being updated, which can be a combination of user and client-id. + * @param shouldAdd A boolean indicating whether to add or remove the quota entity. + */ + private def updateQuotaTypes(quotaEntity: KafkaQuotaEntity, shouldAdd: Boolean): Unit = { + + val activeQuotaType = quotaEntity match { + case KafkaQuotaEntity(Some(_), Some(_)) => QuotaTypes.UserClientIdQuotaEnabled + case KafkaQuotaEntity(Some(_), None) => QuotaTypes.UserQuotaEnabled + case KafkaQuotaEntity(None, Some(_)) => QuotaTypes.ClientIdQuotaEnabled + case _ => QuotaTypes.NoQuotas + } + + val isActive = quotaCallback.getActiveQuotasEntities.contains(quotaEntity) Review Comment: Since we cannot put `getActiveQuotasEntities` in the interface, we can instead do: ``` val isActive = quotaCallback match { case callback: DefaultQuotaCallback => callback.getActiveQuotasEntities.contains(quotaEntity) case _ => true } ``` ########## clients/clients-integration-tests/src/test/java/org/apache/kafka/server/quota/CustomQuotaCallbackTest.java: ########## @@ -154,6 +155,11 @@ public boolean updateClusterMetadata(Cluster cluster) { return true; } + @Override + public Set<ClientQuotaEntity> getActiveQuotasEntities() { + return Set.of(); + } Review Comment: We shouldn't rely on updating custom quota callbacks since users may have their own implementations. ########## core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala: ########## @@ -1759,6 +1759,8 @@ class DummyClientQuotaCallback extends ClientQuotaCallback with Reconfigurable { override def close(): Unit = {} + override def getActiveQuotasEntities: util.Set[quota.ClientQuotaEntity] = util.Collections.emptySet() Review Comment: Not needed. ########## core/src/main/scala/kafka/server/ClientQuotaManager.scala: ########## @@ -451,6 +449,51 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, } } + /** + * Updates `quotaTypesEnabled` by performing a bitwise OR operation to combine the enabled quota types. + * This method ensures that the `quotaTypesEnabled` field reflects the active quota types based on the + * current state of `activeQuotaEntities`. + * For example: + * - If UserQuotaEnabled = 2 and ClientIdQuotaEnabled = 1, then quotaTypesEnabled = 3 (2 | 1 = 3) + * - If UserClientIdQuotaEnabled = 4 and UserQuotaEnabled = 1, then quotaTypesEnabled = (4 | 1 = 5) + * - If UserClientIdQuotaEnabled = 4 and ClientIdQuotaEnabled = 2, then quotaTypesEnabled = 6 (4 | 2 = 6) + * - If all three are enabled (1 | 2 | 4), then quotaTypesEnabled = 7 + * + * @param quotaEntity The entity for which the quota is being updated, which can be a combination of user and client-id. + * @param shouldAdd A boolean indicating whether to add or remove the quota entity. + */ + private def updateQuotaTypes(quotaEntity: KafkaQuotaEntity, shouldAdd: Boolean): Unit = { + + val activeQuotaType = quotaEntity match { + case KafkaQuotaEntity(Some(_), Some(_)) => QuotaTypes.UserClientIdQuotaEnabled + case KafkaQuotaEntity(Some(_), None) => QuotaTypes.UserQuotaEnabled + case KafkaQuotaEntity(None, Some(_)) => QuotaTypes.ClientIdQuotaEnabled + case _ => QuotaTypes.NoQuotas + } + + val isActive = quotaCallback.getActiveQuotasEntities.contains(quotaEntity) + if (shouldAdd && !isActive) { + activeQuotaEntities.compute(activeQuotaType, (_, currentValue) => if (currentValue == 0) 1 else currentValue + 1) + quotaTypesEnabled |= activeQuotaType + } else if (!shouldAdd && isActive) { + activeQuotaEntities.compute(activeQuotaType, (_, currentValue) => if (currentValue <= 1) 0 else currentValue - 1) + if (activeQuotaEntities.get(activeQuotaType) == 0) { + quotaTypesEnabled &= ~activeQuotaType + } + } + + val quotaTypes = List( + QuotaTypes.UserClientIdQuotaEnabled -> "UserClientIdQuota", + QuotaTypes.ClientIdQuotaEnabled -> "ClientIdQuota", + QuotaTypes.UserQuotaEnabled -> "UserQuota" + ) + + val activeEntities = quotaTypes.collect { + case (k, name) if activeQuotaEntities.get(k) > 0 => name + }.mkString(", ") + info(s"Quota types enabled has been changed to $quotaTypesEnabled with active quota entities: [$activeEntities]") Review Comment: We should probably only do `updateQuotaTypes()` only if custom quotas are not used. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org