dajac commented on a change in pull request #8977:
URL: https://github.com/apache/kafka/pull/8977#discussion_r449633467



##########
File path: clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java
##########
@@ -209,4 +212,139 @@ public void shouldReturnPresenceOfMetrics() {
 
         assertThat(sensor.hasMetrics(), is(true));
     }
-}
\ No newline at end of file
+
+    @Test
+    public void testStrictQuotaEnforcement() {
+        final Time time = new MockTime(0, 0, 0);
+        final Metrics metrics = new Metrics(time);
+        final Sensor sensor = metrics.sensor("sensor", new MetricConfig()
+            .quota(Quota.upperBound(10))
+            .timeWindow(1, TimeUnit.SECONDS)
+            .samples(11));
+        final MetricName metricName = metrics.metricName("rate", "test-group");
+        assertTrue(sensor.add(metricName, new Rate()));
+        final KafkaMetric rateMetric = metrics.metric(metricName);
+
+        // Recording a first value at T+0s to bring the avg rate to 9. Value 
is accepted
+        // because the quota is not exhausted yet.
+        sensor.record(90, time.milliseconds(), QuotaEnforcementType.STRICT);
+        assertEquals(9, rateMetric.measurableValue(time.milliseconds()), 0.1);
+
+        // Recording a second value at T+1s to bring the avg rate to 18. Value 
is accepted
+        // because the quota is not exhausted yet.
+        time.sleep(1000);
+        sensor.record(90, time.milliseconds(), QuotaEnforcementType.STRICT);
+        assertEquals(18, rateMetric.measurableValue(time.milliseconds()), 0.1);
+
+        // Recording a third value at T+2s is rejected immediately and rate is 
not updated
+        // because the quota is exhausted.
+        time.sleep(1000);
+        assertThrows(QuotaViolationException.class,
+            () -> sensor.record(90, time.milliseconds(), 
QuotaEnforcementType.STRICT));
+        assertEquals(18, rateMetric.measurableValue(time.milliseconds()), 0.1);
+
+        metrics.close();
+    }
+
+    @Test
+    public void testPermissiveQuotaEnforcement() {
+        final Time time = new MockTime(0, 0, 0);
+        final Metrics metrics = new Metrics(time);
+        final Sensor sensor = metrics.sensor("sensor", new MetricConfig()
+            .quota(Quota.upperBound(10))
+            .timeWindow(1, TimeUnit.SECONDS)
+            .samples(11));
+        final MetricName metricName = metrics.metricName("rate", "test-group");
+        assertTrue(sensor.add(metricName, new Rate()));
+        final KafkaMetric rateMetric = metrics.metric(metricName);
+
+        // Recording a first value at T+0s to bring the avg rate to 9. Value 
is accepted
+        // because the quota is not exhausted yet.
+        sensor.record(90, time.milliseconds(), 
QuotaEnforcementType.PERMISSIVE);
+        assertEquals(9, rateMetric.measurableValue(time.milliseconds()), 0.1);
+
+        // Recording a second value at T+1s to bring the avg rate to 18. Value 
is accepted
+        // and rate is updated even though the quota is exhausted.
+        time.sleep(1000);
+        assertThrows(QuotaViolationException.class,
+            () -> sensor.record(90, time.milliseconds(), 
QuotaEnforcementType.PERMISSIVE));
+        assertEquals(18, rateMetric.measurableValue(time.milliseconds()), 0.1);
+
+        // Recording a second value at T+1s to bring the avg rate to 27. Value 
is accepted
+        // and rate is updated even though the quota is exhausted.
+        time.sleep(1000);
+        assertThrows(QuotaViolationException.class,
+            () -> sensor.record(90, time.milliseconds(), 
QuotaEnforcementType.PERMISSIVE));
+        assertEquals(27, rateMetric.measurableValue(time.milliseconds()), 0.1);
+
+        metrics.close();
+    }
+
+    @Test
+    public void testStrictQuotaEnforcementWithDefaultRate() {
+        final Time time = new MockTime(0, 0, 0);
+        final Metrics metrics = new Metrics(time);
+        final Sensor sensor = metrics.sensor("sensor", new MetricConfig()
+            .quota(Quota.upperBound(2))
+            .timeWindow(1, TimeUnit.SECONDS)
+            .samples(11));
+        final MetricName metricName = metrics.metricName("rate", "test-group");
+        assertTrue(sensor.add(metricName, new Rate()));
+        final KafkaMetric rateMetric = metrics.metric(metricName);
+
+        // Recording a first value at T+0 to bring the avg rate to 3 which is 
already
+        // above the quota.
+        sensor.record(30, time.milliseconds(), QuotaEnforcementType.STRICT);
+        assertEquals(3, rateMetric.measurableValue(time.milliseconds()), 0.1);
+
+        // Theoretically, we should wait 5s to bring back the avg rate to the 
define quota:
+        // ((30 / 10) - 2) / 2 * 10 = 5s
+        time.sleep(5000);
+
+        // But, recording a second value is rejected because the avr rate is 
still equal
+        // to 3 after 5s.
+        assertThrows(QuotaViolationException.class,
+            () -> sensor.record(30, time.milliseconds(), 
QuotaEnforcementType.STRICT));
+        assertEquals(3, rateMetric.measurableValue(time.milliseconds()), 0.1);
+
+        metrics.close();
+    }
+
+    @Test
+    public void testStrictQuotaEnforcementWithTokenBucketBasedRate() {

Review comment:
       This test shows that the solution actually resolves the problem.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to