ableegoldman commented on a change in pull request #8697: URL: https://github.com/apache/kafka/pull/8697#discussion_r430707581
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java ########## @@ -223,6 +223,9 @@ public StateStore getStateStore(final String name) { final V value) { setCurrentNode(child); child.process(key, value); + if (child.children().isEmpty()) { Review comment: ack ########## File path: clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java ########## @@ -492,6 +493,52 @@ public void testPercentiles() { assertEquals(75, (Double) p75.metricValue(), 1.0); } + @Test + public void testPercentilesWithRandomNumbersAndLinearBucketing() { + long seed = new Random().nextLong(); + int sizeInBytes = 1000 * 1000; // 1MB + long maximumValue = 1000 * 24 * 60 * 60 * 1000L; // if values are ms, max is 1000 days + + try { + Random prng = new Random(seed); + int numberOfValues = 5000 + prng.nextInt(10_000); // ranges is [5000, 15000] + + Percentiles percs = new Percentiles(sizeInBytes, + maximumValue, + BucketSizing.LINEAR, + new Percentile(metrics.metricName("test.p90", "grp1"), 90), + new Percentile(metrics.metricName("test.p99", "grp1"), 99)); + MetricConfig config = new MetricConfig().eventWindow(50).samples(2); + Sensor sensor = metrics.sensor("test", config); + sensor.add(percs); + Metric p90 = this.metrics.metrics().get(metrics.metricName("test.p90", "grp1")); + Metric p99 = this.metrics.metrics().get(metrics.metricName("test.p99", "grp1")); + + final List<Long> values = new ArrayList<>(numberOfValues); + // record two windows worth of sequential values + for (int i = 0; i < numberOfValues; ++i) { + long value = Math.abs(prng.nextLong()) % maximumValue; Review comment: I wasn't going for a uniform distribution, just any non-pidgeonholed distribution (see existing `testPercentiles` for comparison). I was just trying to verify the basic validity and get a rough estimate on the accuracy here in case it turned out to be 5000% off. Good point about the overflow though. Pretty annoying that you can't give a bound for `nextLong` like you can with `nextInt` :/ ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ########## @@ -586,6 +606,7 @@ public boolean process(final long wallClockTime) { log.trace("Start processing one record [{}]", record); updateProcessorContext(record, currNode, wallClockTime); + maybeRecordE2ELatency(record.timestamp, wallClockTime, currNode.name()); Review comment: We discussed this offline, but in case anyone else was wondering: Yes. We can't record the latency _after_ processing for source nodes due to our recursive DFS approach to processing, as the source node's `#process` actually doesn't complete until the record has been processed by every other node in the subtopology. And anyways, the intent of the source node metric is to gauge the e2e latency when the record arrives at the subtopology, which is what we are recording here. ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java ########## @@ -149,6 +154,10 @@ public int hashCode() { public static final String RATE_DESCRIPTION_PREFIX = "The average number of "; public static final String RATE_DESCRIPTION_SUFFIX = " per second"; + public static final int PERCENTILES_SIZE_IN_BYTES = 1000 * 1000; // 1 MB + public static long MAXIMUM_E2E_LATENCY = 10 * 24 * 60 * 60 * 1000L; // maximum latency is 10 days Review comment: @cadonna @vvcephei @mjsax We have several related discussions going on across this PR so I'm just going to try and summarize here: let me know if I miss anything you still feel is important The plan is to pin large/small values in the percentiles to the min/max for now and just log a warning. Since we're the only users of the `Percentiles` class, we can just modify it directly and avoid restricting the values for the min/max metrics as John mentioned above. If a user is experiencing small negative e2e latencies it's likely due to clock drift, and approximating as 0 seems reasonable. If they're experiencing large negative e2e latencies, there's clearly something odd going on and the e2e latency percentiles aren't meaningful. But it will still show up in the min metric and alert them to this. Presumably users may be interested to know. I'd like to avoid introducing a config in particular because the maximum isn't an inherent mathematical property of percentiles (obviously), it's just an artifact of the existing percentiles algorithm. We can improve this and presumably remove the requirement to set a static max, but I felt the algorithm was "good enough" for now (and didn't want to make large scale changes and/or rewrite it entirely right before the 2.6 deadline). In sum I'd say the guiding principle for this PR and the initial metrics was to be useful without being misleadingly wrong. I think pinning the percentiles to the bounds but reporting the min/max as is achieves this, and allows us flexibility in improving the situation later ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org