ahuang98 commented on code in PR #19742: URL: https://github.com/apache/kafka/pull/19742#discussion_r2101129753
########## core/src/main/scala/kafka/server/ClientQuotaManager.scala: ########## @@ -428,18 +426,21 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, try { val quotaEntity = KafkaQuotaEntity(userEntity, clientEntity) - if (userEntity.nonEmpty) { - if (quotaEntity.clientIdEntity.nonEmpty) - quotaTypesEnabled |= QuotaTypes.UserClientIdQuotaEnabled - else - quotaTypesEnabled |= QuotaTypes.UserQuotaEnabled - } else if (clientEntity.nonEmpty) - quotaTypesEnabled |= QuotaTypes.ClientIdQuotaEnabled - + val quotaTypes = (userEntity.nonEmpty, clientEntity.nonEmpty) match { + case (true, true) => QuotaTypes.UserClientIdQuotaEnabled + case (true, false) => QuotaTypes.UserQuotaEnabled + case (false, true) => QuotaTypes.ClientIdQuotaEnabled + case (false, false) => QuotaTypes.NoQuotas + } quota match { - case Some(newQuota) => quotaCallback.updateQuota(clientQuotaType, quotaEntity, newQuota.bound) - case None => quotaCallback.removeQuota(clientQuotaType, quotaEntity) + case Some(newQuota) => + quotaCallback.updateQuota(clientQuotaType, quotaEntity, newQuota.bound) + updateQuotaTypes(quotaTypes, increment = true) + case None => + quotaCallback.removeQuota(clientQuotaType, quotaEntity) // change here. Review Comment: don't forget to remove the code comment ########## core/src/main/scala/kafka/server/ClientQuotaManager.scala: ########## @@ -428,18 +426,21 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, try { val quotaEntity = KafkaQuotaEntity(userEntity, clientEntity) - if (userEntity.nonEmpty) { - if (quotaEntity.clientIdEntity.nonEmpty) - quotaTypesEnabled |= QuotaTypes.UserClientIdQuotaEnabled - else - quotaTypesEnabled |= QuotaTypes.UserQuotaEnabled - } else if (clientEntity.nonEmpty) - quotaTypesEnabled |= QuotaTypes.ClientIdQuotaEnabled - + val quotaTypes = (userEntity.nonEmpty, clientEntity.nonEmpty) match { + case (true, true) => QuotaTypes.UserClientIdQuotaEnabled + case (true, false) => QuotaTypes.UserQuotaEnabled + case (false, true) => QuotaTypes.ClientIdQuotaEnabled + case (false, false) => QuotaTypes.NoQuotas Review Comment: just want to clarify my understanding of NoQuotas - if this method were called with a non-empty `quota` but empty `userEntity` and empty `clientEntity`, would that essentially be a no-op? assuming that combination is not expected/possible ########## core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala: ########## @@ -501,6 +501,76 @@ class ClientQuotaManagerTest extends BaseClientQuotaManagerTest { } } + @Test + def testQuotaTypesEnabledUpdatesOnAddAndRemove(): Unit = { + val clientQuotaManager = new ClientQuotaManager(config, metrics, QuotaType.CONTROLLER_MUTATION, time, "") + try { + // Initially, quotaTypesEnabled should be QuotaTypes.NoQuotas and quotasEnabled should be false + assertEquals(QuotaTypes.NoQuotas, clientQuotaManager.getQuotaTypesEnabled) + assertFalse(clientQuotaManager.quotasEnabled) + + // Add a client-id quota and quotaTypesEnabled should be QuotaTypes.ClientIdQuotaEnabled + clientQuotaManager.updateQuota( + None, + Some(ClientQuotaManager.ClientIdEntity("client1")), + Some(new Quota(5, true)) + ) + assertEquals(QuotaTypes.ClientIdQuotaEnabled, clientQuotaManager.getQuotaTypesEnabled) + assertTrue(clientQuotaManager.quotasEnabled) + Review Comment: could we add another case here where we add another `ClientIdQuota` and show that `getQuotaTypesEnabled` still remains `QuotaTypes.ClientIdQuotaEnabled`? ########## core/src/main/scala/kafka/server/ClientQuotaManager.scala: ########## @@ -451,6 +452,43 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, } } + /** + * Helper method to update quota types counts and quotaTypesEnabled flag. + * @param quotaTypeKey The QuotaTypes constant (e.g., QuotaTypes.UserClientIdQuotaEnabled) + * @param increment True to increment count, false to decrement + */ + private def updateQuotaTypes(quotaTypeKey: Int, increment: Boolean): Unit = { + if (quotaTypeKey == QuotaTypes.NoQuotas) { + return + } + val previousQuotaTypesEnabled = quotaTypesEnabled + + // Update activeQuotaTypes counts + activeQuotaTypes.compute(quotaTypeKey, (_, count) => + if (increment) Option(count).getOrElse(0) + 1 + else if (Option(count).exists(_ > 1)) count - 1 + else 0 + ) + + // Update quotaTypesEnabled based on activeQuotaTypes counts + quotaTypesEnabled = clientQuotaCallbackPlugin match { + case Some(_) => QuotaTypes.CustomQuotas + case None => + var newQuotaTypes = QuotaTypes.NoQuotas + if (activeQuotaTypes.getOrDefault(QuotaTypes.UserClientIdQuotaEnabled, 0) > 0) + newQuotaTypes |= QuotaTypes.UserClientIdQuotaEnabled + if (activeQuotaTypes.getOrDefault(QuotaTypes.UserQuotaEnabled, 0) > 0) + newQuotaTypes |= QuotaTypes.UserQuotaEnabled + if (activeQuotaTypes.getOrDefault(QuotaTypes.ClientIdQuotaEnabled, 0) > 0) + newQuotaTypes |= QuotaTypes.ClientIdQuotaEnabled + newQuotaTypes + } + + if (previousQuotaTypesEnabled != quotaTypesEnabled) { + debug(s"Quota types enabled changed from $previousQuotaTypesEnabled to $quotaTypesEnabled") Review Comment: let's say we have positive counts for QuotaTypes.UserClientIdQuotaEnabled and QuotaTypes.UserQuotaEnabled, when we print the `quotaTypesEnabled` we would expect to see `6` printed? ########## core/src/main/scala/kafka/server/ClientQuotaManager.scala: ########## @@ -451,6 +452,43 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, } } + /** + * Helper method to update quota types counts and quotaTypesEnabled flag. + * @param quotaTypeKey The QuotaTypes constant (e.g., QuotaTypes.UserClientIdQuotaEnabled) + * @param increment True to increment count, false to decrement + */ + private def updateQuotaTypes(quotaTypeKey: Int, increment: Boolean): Unit = { + if (quotaTypeKey == QuotaTypes.NoQuotas) { + return + } + val previousQuotaTypesEnabled = quotaTypesEnabled + + // Update activeQuotaTypes counts + activeQuotaTypes.compute(quotaTypeKey, (_, count) => + if (increment) Option(count).getOrElse(0) + 1 + else if (Option(count).exists(_ > 1)) count - 1 + else 0 + ) + + // Update quotaTypesEnabled based on activeQuotaTypes counts + quotaTypesEnabled = clientQuotaCallbackPlugin match { + case Some(_) => QuotaTypes.CustomQuotas + case None => + var newQuotaTypes = QuotaTypes.NoQuotas + if (activeQuotaTypes.getOrDefault(QuotaTypes.UserClientIdQuotaEnabled, 0) > 0) + newQuotaTypes |= QuotaTypes.UserClientIdQuotaEnabled Review Comment: should we just make this `newQuotaTypes = QuotaTypes.UserClientIdQuotaEnabled`? ########## core/src/main/scala/kafka/server/ClientQuotaManager.scala: ########## @@ -451,6 +452,43 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, } } + /** + * Helper method to update quota types counts and quotaTypesEnabled flag. + * @param quotaTypeKey The QuotaTypes constant (e.g., QuotaTypes.UserClientIdQuotaEnabled) + * @param increment True to increment count, false to decrement + */ + private def updateQuotaTypes(quotaTypeKey: Int, increment: Boolean): Unit = { + if (quotaTypeKey == QuotaTypes.NoQuotas) { + return + } + val previousQuotaTypesEnabled = quotaTypesEnabled + + // Update activeQuotaTypes counts + activeQuotaTypes.compute(quotaTypeKey, (_, count) => + if (increment) Option(count).getOrElse(0) + 1 + else if (Option(count).exists(_ > 1)) count - 1 + else 0 + ) + + // Update quotaTypesEnabled based on activeQuotaTypes counts Review Comment: maybe would be helpful if this comment explained the bitwise operation, potentially with an example. e.g. quotaTypesEnabled is the bitwise representation of what quota types are enabled. For instance, if quotaTypesEnabled == 6, then we know we have UserClientIdQuotaEnabled and UserQuotaEnabled since their bit representations are 4 and 2. (4 | 2 = 6) ########## core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala: ########## @@ -501,6 +501,76 @@ class ClientQuotaManagerTest extends BaseClientQuotaManagerTest { } } + @Test + def testQuotaTypesEnabledUpdatesOnAddAndRemove(): Unit = { + val clientQuotaManager = new ClientQuotaManager(config, metrics, QuotaType.CONTROLLER_MUTATION, time, "") + try { + // Initially, quotaTypesEnabled should be QuotaTypes.NoQuotas and quotasEnabled should be false + assertEquals(QuotaTypes.NoQuotas, clientQuotaManager.getQuotaTypesEnabled) + assertFalse(clientQuotaManager.quotasEnabled) + + // Add a client-id quota and quotaTypesEnabled should be QuotaTypes.ClientIdQuotaEnabled + clientQuotaManager.updateQuota( + None, + Some(ClientQuotaManager.ClientIdEntity("client1")), + Some(new Quota(5, true)) + ) + assertEquals(QuotaTypes.ClientIdQuotaEnabled, clientQuotaManager.getQuotaTypesEnabled) + assertTrue(clientQuotaManager.quotasEnabled) + + // Add a user quota and quotaTypesEnabled should be QuotaTypes.UserQuotaEnabled | QuotaTypes.ClientIdQuotaEnabled + clientQuotaManager.updateQuota( + Some(ClientQuotaManager.UserEntity("userA")), + None, + Some(new Quota(5, true)) + ) + assertEquals(QuotaTypes.UserQuotaEnabled | QuotaTypes.ClientIdQuotaEnabled, clientQuotaManager.getQuotaTypesEnabled) + assertTrue(clientQuotaManager.quotasEnabled) + + // Add a user-client-id quota and quotaTypesEnabled should + // be QuotaTypes.UserClientIdQuotaEnabled | QuotaTypes.ClientIdQuotaEnabled | QuotaTypes.UserQuotaEnabled + clientQuotaManager.updateQuota( + Some(ClientQuotaManager.UserEntity("userB")), + Some(ClientQuotaManager.ClientIdEntity("client2")), + Some(new Quota(10, true)) + ) + assertEquals( + QuotaTypes.UserClientIdQuotaEnabled | QuotaTypes.ClientIdQuotaEnabled | QuotaTypes.UserQuotaEnabled, + clientQuotaManager.getQuotaTypesEnabled + ) + assertTrue(clientQuotaManager.quotasEnabled) + + // Remove the user quota and quotaTypesEnabled should be QuotaTypes.UserClientIdQuotaEnabled | QuotaTypes.ClientIdQuotaEnabled Review Comment: with the above suggestions, the remove cases become more interesting too - after removing just one `ClientIdQuota` we would expect that the quotaTypesEnabled does not change yet ########## core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala: ########## @@ -501,6 +501,76 @@ class ClientQuotaManagerTest extends BaseClientQuotaManagerTest { } } + @Test + def testQuotaTypesEnabledUpdatesOnAddAndRemove(): Unit = { + val clientQuotaManager = new ClientQuotaManager(config, metrics, QuotaType.CONTROLLER_MUTATION, time, "") + try { + // Initially, quotaTypesEnabled should be QuotaTypes.NoQuotas and quotasEnabled should be false + assertEquals(QuotaTypes.NoQuotas, clientQuotaManager.getQuotaTypesEnabled) + assertFalse(clientQuotaManager.quotasEnabled) + + // Add a client-id quota and quotaTypesEnabled should be QuotaTypes.ClientIdQuotaEnabled + clientQuotaManager.updateQuota( + None, + Some(ClientQuotaManager.ClientIdEntity("client1")), + Some(new Quota(5, true)) + ) + assertEquals(QuotaTypes.ClientIdQuotaEnabled, clientQuotaManager.getQuotaTypesEnabled) + assertTrue(clientQuotaManager.quotasEnabled) + + // Add a user quota and quotaTypesEnabled should be QuotaTypes.UserQuotaEnabled | QuotaTypes.ClientIdQuotaEnabled + clientQuotaManager.updateQuota( + Some(ClientQuotaManager.UserEntity("userA")), + None, + Some(new Quota(5, true)) + ) + assertEquals(QuotaTypes.UserQuotaEnabled | QuotaTypes.ClientIdQuotaEnabled, clientQuotaManager.getQuotaTypesEnabled) + assertTrue(clientQuotaManager.quotasEnabled) + + // Add a user-client-id quota and quotaTypesEnabled should + // be QuotaTypes.UserClientIdQuotaEnabled | QuotaTypes.ClientIdQuotaEnabled | QuotaTypes.UserQuotaEnabled + clientQuotaManager.updateQuota( + Some(ClientQuotaManager.UserEntity("userB")), + Some(ClientQuotaManager.ClientIdEntity("client2")), + Some(new Quota(10, true)) + ) + assertEquals( + QuotaTypes.UserClientIdQuotaEnabled | QuotaTypes.ClientIdQuotaEnabled | QuotaTypes.UserQuotaEnabled, + clientQuotaManager.getQuotaTypesEnabled + ) + assertTrue(clientQuotaManager.quotasEnabled) + Review Comment: could we add another case here where we add a duplicate quota type (e.g. UserClientIdQuota) and show that quotasEnabled still remains the same value? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org