cmccabe commented on a change in pull request #10254: URL: https://github.com/apache/kafka/pull/10254#discussion_r607267726
########## File path: core/src/test/scala/integration/kafka/server/RaftClusterTest.scala ########## @@ -212,4 +214,103 @@ class RaftClusterTest { cluster.close() } } + + @Test + def testClientQuotas(): Unit = { + val cluster = new KafkaClusterTestKit.Builder( + new TestKitNodes.Builder(). + setNumBrokerNodes(1). + setNumControllerNodes(1).build()).build() + try { + cluster.format() + cluster.startup() + TestUtils.waitUntilTrue(() => cluster.brokers().get(0).currentState() == BrokerState.RUNNING, + "Broker never made it to RUNNING state.") + val admin = Admin.create(cluster.clientProperties()) + try { + val entity = new ClientQuotaEntity(Map("user" -> "testkit").asJava) + var filter = ClientQuotaFilter.containsOnly( + List(ClientQuotaFilterComponent.ofEntity("user", "testkit")).asJava) + + def alterThenDescribe(entity: ClientQuotaEntity, + quotas: Seq[ClientQuotaAlteration.Op], + filter: ClientQuotaFilter, + expectCount: Int): java.util.Map[ClientQuotaEntity, java.util.Map[String, java.lang.Double]] = { + val alterResult = admin.alterClientQuotas(Seq(new ClientQuotaAlteration(entity, quotas.asJava)).asJava) + try { + alterResult.all().get() + } catch { + case t: Throwable => fail("AlterClientQuotas request failed", t) + } + + def describeOrFail(filter: ClientQuotaFilter): java.util.Map[ClientQuotaEntity, java.util.Map[String, java.lang.Double]] = { + try { + admin.describeClientQuotas(filter).entities().get() + } catch { + case t: Throwable => fail("DescribeClientQuotas request failed", t) + } + } + + val (describeResult, ok) = TestUtils.computeUntilTrue(describeOrFail(filter)) { + results => results.getOrDefault(entity, java.util.Collections.emptyMap[String, java.lang.Double]()).size() == expectCount + } + assertTrue(ok, "Broker never saw new client quotas") + describeResult + } + + var describeResult = alterThenDescribe(entity, + Seq(new ClientQuotaAlteration.Op("request_percentage", 0.99)), filter, 1) + assertEquals(0.99, describeResult.get(entity).get("request_percentage"), 1e-6) + + describeResult = alterThenDescribe(entity, Seq( + new ClientQuotaAlteration.Op("request_percentage", 0.97), + new ClientQuotaAlteration.Op("producer_byte_rate", 10000), + new ClientQuotaAlteration.Op("consumer_byte_rate", 10001) + ), filter, 3) + assertEquals(0.97, describeResult.get(entity).get("request_percentage"), 1e-6) + assertEquals(10000.0, describeResult.get(entity).get("producer_byte_rate"), 1e-6) + assertEquals(10001.0, describeResult.get(entity).get("consumer_byte_rate"), 1e-6) + + describeResult = alterThenDescribe(entity, Seq( + new ClientQuotaAlteration.Op("request_percentage", 0.95), + new ClientQuotaAlteration.Op("producer_byte_rate", null), + new ClientQuotaAlteration.Op("consumer_byte_rate", null) + ), filter, 1) + assertEquals(0.95, describeResult.get(entity).get("request_percentage"), 1e-6) + + describeResult = alterThenDescribe(entity, Seq( + new ClientQuotaAlteration.Op("request_percentage", null)), filter, 0) + + describeResult = alterThenDescribe(entity, + Seq(new ClientQuotaAlteration.Op("producer_byte_rate", 9999)), filter, 1) + assertEquals(9999.0, describeResult.get(entity).get("producer_byte_rate"), 1e-6) + + // Add another quota for a different entity with same user part + val entity2 = new ClientQuotaEntity(Map("user" -> "testkit", "client-id" -> "some-client").asJava) + filter = ClientQuotaFilter.containsOnly( + List( + ClientQuotaFilterComponent.ofEntity("user", "testkit"), + ClientQuotaFilterComponent.ofEntity("client-id", "some-client"), + ).asJava) + describeResult = alterThenDescribe(entity2, + Seq(new ClientQuotaAlteration.Op("producer_byte_rate", 9998)), filter, 1) + assertEquals(9998.0, describeResult.get(entity2).get("producer_byte_rate"), 1e-6) + + // non-strict match + filter = ClientQuotaFilter.contains( + List(ClientQuotaFilterComponent.ofEntity("user", "testkit")).asJava) + + val (describeResult2, ok) = TestUtils.computeUntilTrue(admin.describeClientQuotas(filter).entities().get()) { + results => results.size() == 2 + } + assertTrue(ok, "Broker never saw two client quotas") Review comment: The "ok" check feels a little clunky here. Plus if this fails, we don't get any information. In the past I've just used `TestUtils.retry` plus a mutable variable to get a result + timed assertion check. Maybe we could come up with a version of that function that returned a value to be a bit more elegant. In general it would be nice to get the actual exception on a failure. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org