cmccabe commented on a change in pull request #10254:
URL: https://github.com/apache/kafka/pull/10254#discussion_r607267726



##########
File path: core/src/test/scala/integration/kafka/server/RaftClusterTest.scala
##########
@@ -212,4 +214,103 @@ class RaftClusterTest {
       cluster.close()
     }
   }
+
+  @Test
+  def testClientQuotas(): Unit = {
+    val cluster = new KafkaClusterTestKit.Builder(
+      new TestKitNodes.Builder().
+        setNumBrokerNodes(1).
+        setNumControllerNodes(1).build()).build()
+    try {
+      cluster.format()
+      cluster.startup()
+      TestUtils.waitUntilTrue(() => cluster.brokers().get(0).currentState() == 
BrokerState.RUNNING,
+        "Broker never made it to RUNNING state.")
+      val admin = Admin.create(cluster.clientProperties())
+      try {
+        val entity = new ClientQuotaEntity(Map("user" -> "testkit").asJava)
+        var filter = ClientQuotaFilter.containsOnly(
+          List(ClientQuotaFilterComponent.ofEntity("user", "testkit")).asJava)
+
+        def alterThenDescribe(entity: ClientQuotaEntity,
+                              quotas: Seq[ClientQuotaAlteration.Op],
+                              filter: ClientQuotaFilter,
+                              expectCount: Int): 
java.util.Map[ClientQuotaEntity, java.util.Map[String, java.lang.Double]] = {
+          val alterResult = admin.alterClientQuotas(Seq(new 
ClientQuotaAlteration(entity, quotas.asJava)).asJava)
+          try {
+            alterResult.all().get()
+          } catch {
+            case t: Throwable => fail("AlterClientQuotas request failed", t)
+          }
+
+          def describeOrFail(filter: ClientQuotaFilter): 
java.util.Map[ClientQuotaEntity, java.util.Map[String, java.lang.Double]] = {
+            try {
+              admin.describeClientQuotas(filter).entities().get()
+            } catch {
+              case t: Throwable => fail("DescribeClientQuotas request failed", 
t)
+            }
+          }
+
+          val (describeResult, ok) = 
TestUtils.computeUntilTrue(describeOrFail(filter)) {
+            results => results.getOrDefault(entity, 
java.util.Collections.emptyMap[String, java.lang.Double]()).size() == 
expectCount
+          }
+          assertTrue(ok, "Broker never saw new client quotas")
+          describeResult
+        }
+
+        var describeResult = alterThenDescribe(entity,
+          Seq(new ClientQuotaAlteration.Op("request_percentage", 0.99)), 
filter, 1)
+        assertEquals(0.99, 
describeResult.get(entity).get("request_percentage"), 1e-6)
+
+        describeResult = alterThenDescribe(entity, Seq(
+          new ClientQuotaAlteration.Op("request_percentage", 0.97),
+          new ClientQuotaAlteration.Op("producer_byte_rate", 10000),
+          new ClientQuotaAlteration.Op("consumer_byte_rate", 10001)
+        ), filter, 3)
+        assertEquals(0.97, 
describeResult.get(entity).get("request_percentage"), 1e-6)
+        assertEquals(10000.0, 
describeResult.get(entity).get("producer_byte_rate"), 1e-6)
+        assertEquals(10001.0, 
describeResult.get(entity).get("consumer_byte_rate"), 1e-6)
+
+        describeResult = alterThenDescribe(entity, Seq(
+          new ClientQuotaAlteration.Op("request_percentage", 0.95),
+          new ClientQuotaAlteration.Op("producer_byte_rate", null),
+          new ClientQuotaAlteration.Op("consumer_byte_rate", null)
+        ), filter, 1)
+        assertEquals(0.95, 
describeResult.get(entity).get("request_percentage"), 1e-6)
+
+        describeResult = alterThenDescribe(entity, Seq(
+          new ClientQuotaAlteration.Op("request_percentage", null)), filter, 0)
+
+        describeResult = alterThenDescribe(entity,
+          Seq(new ClientQuotaAlteration.Op("producer_byte_rate", 9999)), 
filter, 1)
+        assertEquals(9999.0, 
describeResult.get(entity).get("producer_byte_rate"), 1e-6)
+
+        // Add another quota for a different entity with same user part
+        val entity2 = new ClientQuotaEntity(Map("user" -> "testkit", 
"client-id" -> "some-client").asJava)
+        filter = ClientQuotaFilter.containsOnly(
+          List(
+            ClientQuotaFilterComponent.ofEntity("user", "testkit"),
+            ClientQuotaFilterComponent.ofEntity("client-id", "some-client"),
+          ).asJava)
+        describeResult = alterThenDescribe(entity2,
+          Seq(new ClientQuotaAlteration.Op("producer_byte_rate", 9998)), 
filter, 1)
+        assertEquals(9998.0, 
describeResult.get(entity2).get("producer_byte_rate"), 1e-6)
+
+        // non-strict match
+        filter = ClientQuotaFilter.contains(
+          List(ClientQuotaFilterComponent.ofEntity("user", "testkit")).asJava)
+
+        val (describeResult2, ok) = 
TestUtils.computeUntilTrue(admin.describeClientQuotas(filter).entities().get()) 
{
+          results => results.size() == 2
+        }
+        assertTrue(ok, "Broker never saw two client quotas")

Review comment:
       The "ok" check feels a little clunky here.  Plus if this fails, we don't 
get any information.
   
   In the past I've just used `TestUtils.retry` plus a mutable variable to get 
a result + timed assertion check.  Maybe we could come up with a version of 
that function that returned a value to be a bit more elegant.  In general it 
would be nice to get the actual exception on a failure.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to