junrao commented on code in PR #15889:
URL: https://github.com/apache/kafka/pull/15889#discussion_r1605350575


##########
clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java:
##########
@@ -110,40 +111,43 @@ public String toString() {
     protected void purgeObsoleteSamples(MetricConfig config, long now) {
         long expireAge = config.samples() * config.timeWindowMs();
         for (Sample sample : samples) {
-            if (now - sample.lastWindowMs >= expireAge)
+            if (now - sample.lastEventMs >= expireAge)
                 sample.reset(now);
         }
     }
 
     protected static class Sample {
         public double initialValue;
         public long eventCount;
-        public long lastWindowMs;
+        public long startTimeMs;
+        public long lastEventMs;
         public double value;
 
         public Sample(double initialValue, long now) {
             this.initialValue = initialValue;
             this.eventCount = 0;
-            this.lastWindowMs = now;
+            this.startTimeMs = now;
+            this.lastEventMs = now;
             this.value = initialValue;
         }
 
         public void reset(long now) {
             this.eventCount = 0;
-            this.lastWindowMs = now;
+            this.startTimeMs = now;
+            this.lastEventMs = now;
             this.value = initialValue;
         }
 
         public boolean isComplete(long timeMs, MetricConfig config) {
-            return timeMs - lastWindowMs >= config.timeWindowMs() || 
eventCount >= config.eventWindow();
+            return timeMs - startTimeMs >= config.timeWindowMs() || eventCount 
>= config.eventWindow();
         }
 
         @Override
         public String toString() {
             return "Sample(" +
                 "value=" + value +
                 ", eventCount=" + eventCount +
-                ", lastWindowMs=" + lastWindowMs +
+                ", startTimeMs=" + startTimeMs +

Review Comment:
   Should we add lastEventMs?



##########
clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java:
##########
@@ -110,40 +111,43 @@ public String toString() {
     protected void purgeObsoleteSamples(MetricConfig config, long now) {
         long expireAge = config.samples() * config.timeWindowMs();
         for (Sample sample : samples) {
-            if (now - sample.lastWindowMs >= expireAge)
+            if (now - sample.lastEventMs >= expireAge)

Review Comment:
   Could we add a comment that we don't purge overlapping samples?



##########
clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java:
##########
@@ -82,7 +82,7 @@ public long windowSize(MetricConfig config, long now) {
          * but this approach does not account for sleeps. SampledStat only 
creates samples whenever record is called,
          * if no record is called for a period of time that time is not 
accounted for in windowSize and produces incorrect results.
          */
-        long totalElapsedTimeMs = now - stat.oldest(now).lastWindowMs;
+        long totalElapsedTimeMs = now - stat.oldest(now).startTimeMs;

Review Comment:
   Could we add some comments to note that the total elapse time could be 
larger than the window size since we keep the overlapping samples?



##########
clients/src/test/java/org/apache/kafka/common/metrics/stats/RateTest.java:
##########
@@ -64,4 +69,30 @@ public void testRateWithNoPriorAvailableSamples(int 
numSample, int sampleWindowS
         double expectedRatePerSec = sampleValue / windowSize;
         assertEquals(expectedRatePerSec, observedRate, EPS);
     }
+
+    // Record an event every 100 ms on average, moving some 1 ms back or forth 
for fine-grained 
+    // window control. The expected rate, hence, is 10-11 events/sec depending 
on the moment of 
+    // measurement. Start assertions from the second window.
+    @Test
+    public void testRateIsConsistentAfterTheFirstWindow() {
+        MetricConfig config = new MetricConfig().timeWindow(1, 
SECONDS).samples(2);
+        List<Integer> steps = Arrays.asList(0, 99, 100, 100, 100, 100, 100, 
100, 100, 100, 100);
+
+        // start the first window and record events at 0,99,199,...,999 ms 
+        for (int stepMs : steps) {
+            time.sleep(stepMs);
+            rate.record(config, 1, time.milliseconds());
+        }
+
+        // making a gap of 100 ms between windows
+        time.sleep(101);
+
+        // start the second window and record events at 0,99,199,...,999 ms
+        for (int stepMs : steps) {
+            time.sleep(stepMs);
+            rate.record(config, 1, time.milliseconds());
+            double observedRate = rate.measure(config, time.milliseconds());

Review Comment:
   Should we do a second measurement and check that the result doesn't change?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to