junrao commented on code in PR #15889:
URL: https://github.com/apache/kafka/pull/15889#discussion_r1608743345


##########
clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java:
##########
@@ -50,10 +50,11 @@ public void record(MetricConfig config, double value, long 
timeMs) {
             sample = advance(config, timeMs);
         update(sample, config, value, timeMs);
         sample.eventCount += 1;
+        sample.lastEventMs = timeMs;
     }
 
     private Sample advance(MetricConfig config, long timeMs) {
-        this.current = (this.current + 1) % config.samples();
+        this.current = (this.current + 1) % (config.samples() + 1);

Review Comment:
   It would be useful to add a comment to explain why we keep an additional 
sample than configured.



##########
clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java:
##########
@@ -72,29 +67,12 @@ public long windowSize(MetricConfig config, long now) {
         stat.purgeObsoleteSamples(config, now);
 
         /*
-         * Here we check the total amount of time elapsed since the oldest 
non-obsolete window.
-         * This give the total windowSize of the batch which is the time used 
for Rate computation.
-         * However, there is an issue if we do not have sufficient data for 
e.g. if only 1 second has elapsed in a 30 second
-         * window, the measured rate will be very high.
-         * Hence we assume that the elapsed time is always N-1 complete 
windows plus whatever fraction of the final window is complete.
-         *
-         * Note that we could simply count the amount of time elapsed in the 
current window and add n-1 windows to get the total time,
-         * but this approach does not account for sleeps. SampledStat only 
creates samples whenever record is called,
-         * if no record is called for a period of time that time is not 
accounted for in windowSize and produces incorrect results.
+         * Purging process above guarantees to keep all events starting from
+         * earliest(monitoredWindow start, oldestSample start). Use the 
largest as windowSize.
          */
-        long totalElapsedTimeMs = now - stat.oldest(now).lastWindowMs;
-        // Check how many full windows of data we have currently retained
-        int numFullWindows = (int) (totalElapsedTimeMs / 
config.timeWindowMs());
-        int minFullWindows = config.samples() - 1;
-
-        // If the available windows are less than the minimum required, add 
the difference to the totalElapsedTime
-        if (numFullWindows < minFullWindows)
-            totalElapsedTimeMs += (minFullWindows - numFullWindows) * 
config.timeWindowMs();
-
-        // If window size is being calculated at the exact beginning of the 
window with no prior samples, the window size
-        // will result in a value of 0. Calculation of rate over a window is 
size 0 is undefined, hence, we assume the
-        // minimum window size to be at least 1ms.
-        return Math.max(totalElapsedTimeMs, 1);
+        long monitoredWindow = config.timeWindowMs() * config.samples();

Review Comment:
   Hmm, the changes the existing logic a bit. The existing logic makes sure 
that we include at least config.samples() - 1 full windows. The last one could 
be partial.



##########
clients/src/test/java/org/apache/kafka/common/metrics/stats/RateTest.java:
##########
@@ -64,4 +69,30 @@ public void testRateWithNoPriorAvailableSamples(int 
numSample, int sampleWindowS
         double expectedRatePerSec = sampleValue / windowSize;
         assertEquals(expectedRatePerSec, observedRate, EPS);
     }
+
+    // Record an event every 100 ms on average, moving some 1 ms back or forth 
for fine-grained 
+    // window control. The expected rate, hence, is 10-11 events/sec depending 
on the moment of 
+    // measurement. Start assertions from the second window.
+    @Test
+    public void testRateIsConsistentAfterTheFirstWindow() {
+        MetricConfig config = new MetricConfig().timeWindow(1, 
SECONDS).samples(2);
+        List<Integer> steps = Arrays.asList(0, 99, 100, 100, 100, 100, 100, 
100, 100, 100, 100);
+
+        // start the first window and record events at 0,99,199,...,999 ms 
+        for (int stepMs : steps) {
+            time.sleep(stepMs);
+            rate.record(config, 1, time.milliseconds());
+        }
+
+        // making a gap of 100 ms between windows
+        time.sleep(101);
+
+        // start the second window and record events at 0,99,199,...,999 ms
+        for (int stepMs : steps) {
+            time.sleep(stepMs);
+            rate.record(config, 1, time.milliseconds());
+            double observedRate = rate.measure(config, time.milliseconds());

Review Comment:
   Yes, it's probably useful to assert that taking a second measurement with no 
time change leads to the same value. This is more for preventing future 
incorrect changes and it's also low overhead. 



##########
clients/src/test/java/org/apache/kafka/common/metrics/stats/RateTest.java:
##########
@@ -64,4 +69,31 @@ public void testRateWithNoPriorAvailableSamples(int 
numSample, int sampleWindowS
         double expectedRatePerSec = sampleValue / windowSize;
         assertEquals(expectedRatePerSec, observedRate, EPS);
     }
+
+    // Record an event every 100 ms on average, moving some 1 ms back or forth 
for fine-grained 
+    // window control. The expected rate, hence, is 10-11 events/sec depending 
on the moment of 
+    // measurement. Start assertions from the second window. This test is to 
address past issue,
+    // when measurements in the end of the sample led to value spikes.

Review Comment:
   How about changing "This test is to address past issue, when measurements in 
the end of the sample led to value spikes." to sth like "This test covers the 
case where a sample window partially overlaps with the monitored window." ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to