junrao commented on code in PR #15889: URL: https://github.com/apache/kafka/pull/15889#discussion_r1605350575
########## clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java: ########## @@ -110,40 +111,43 @@ public String toString() { protected void purgeObsoleteSamples(MetricConfig config, long now) { long expireAge = config.samples() * config.timeWindowMs(); for (Sample sample : samples) { - if (now - sample.lastWindowMs >= expireAge) + if (now - sample.lastEventMs >= expireAge) sample.reset(now); } } protected static class Sample { public double initialValue; public long eventCount; - public long lastWindowMs; + public long startTimeMs; + public long lastEventMs; public double value; public Sample(double initialValue, long now) { this.initialValue = initialValue; this.eventCount = 0; - this.lastWindowMs = now; + this.startTimeMs = now; + this.lastEventMs = now; this.value = initialValue; } public void reset(long now) { this.eventCount = 0; - this.lastWindowMs = now; + this.startTimeMs = now; + this.lastEventMs = now; this.value = initialValue; } public boolean isComplete(long timeMs, MetricConfig config) { - return timeMs - lastWindowMs >= config.timeWindowMs() || eventCount >= config.eventWindow(); + return timeMs - startTimeMs >= config.timeWindowMs() || eventCount >= config.eventWindow(); } @Override public String toString() { return "Sample(" + "value=" + value + ", eventCount=" + eventCount + - ", lastWindowMs=" + lastWindowMs + + ", startTimeMs=" + startTimeMs + Review Comment: Should we add lastEventMs? ########## clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java: ########## @@ -110,40 +111,43 @@ public String toString() { protected void purgeObsoleteSamples(MetricConfig config, long now) { long expireAge = config.samples() * config.timeWindowMs(); for (Sample sample : samples) { - if (now - sample.lastWindowMs >= expireAge) + if (now - sample.lastEventMs >= expireAge) Review Comment: Could we add a comment that we don't purge overlapping samples? ########## clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java: ########## @@ -82,7 +82,7 @@ public long windowSize(MetricConfig config, long now) { * but this approach does not account for sleeps. SampledStat only creates samples whenever record is called, * if no record is called for a period of time that time is not accounted for in windowSize and produces incorrect results. */ - long totalElapsedTimeMs = now - stat.oldest(now).lastWindowMs; + long totalElapsedTimeMs = now - stat.oldest(now).startTimeMs; Review Comment: Could we add some comments to note that the total elapse time could be larger than the window size since we keep the overlapping samples? ########## clients/src/test/java/org/apache/kafka/common/metrics/stats/RateTest.java: ########## @@ -64,4 +69,30 @@ public void testRateWithNoPriorAvailableSamples(int numSample, int sampleWindowS double expectedRatePerSec = sampleValue / windowSize; assertEquals(expectedRatePerSec, observedRate, EPS); } + + // Record an event every 100 ms on average, moving some 1 ms back or forth for fine-grained + // window control. The expected rate, hence, is 10-11 events/sec depending on the moment of + // measurement. Start assertions from the second window. + @Test + public void testRateIsConsistentAfterTheFirstWindow() { + MetricConfig config = new MetricConfig().timeWindow(1, SECONDS).samples(2); + List<Integer> steps = Arrays.asList(0, 99, 100, 100, 100, 100, 100, 100, 100, 100, 100); + + // start the first window and record events at 0,99,199,...,999 ms + for (int stepMs : steps) { + time.sleep(stepMs); + rate.record(config, 1, time.milliseconds()); + } + + // making a gap of 100 ms between windows + time.sleep(101); + + // start the second window and record events at 0,99,199,...,999 ms + for (int stepMs : steps) { + time.sleep(stepMs); + rate.record(config, 1, time.milliseconds()); + double observedRate = rate.measure(config, time.milliseconds()); Review Comment: Should we do a second measurement and check that the result doesn't change? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org