vvcephei commented on a change in pull request #8697: URL: https://github.com/apache/kafka/pull/8697#discussion_r430575822
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java ########## @@ -149,6 +154,10 @@ public int hashCode() { public static final String RATE_DESCRIPTION_PREFIX = "The average number of "; public static final String RATE_DESCRIPTION_SUFFIX = " per second"; + public static final int PERCENTILES_SIZE_IN_BYTES = 1000 * 1000; // 1 MB + public static long MAXIMUM_E2E_LATENCY = 10 * 24 * 60 * 60 * 1000L; // maximum latency is 10 days Review comment: This necessity makes me think that our Percentiles metric algorithm needs to be improved. Admittedly, I haven't looked at the math, but it seems like it should be possible to be more adaptive. I'm in favor of not adding a config and just leaving it alone for now, so that we can take the option in the future to fix the problem by fixing the algorithm. ########## File path: clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java ########## @@ -492,6 +493,52 @@ public void testPercentiles() { assertEquals(75, (Double) p75.metricValue(), 1.0); } + @Test + public void testPercentilesWithRandomNumbersAndLinearBucketing() { + long seed = new Random().nextLong(); + int sizeInBytes = 1000 * 1000; // 1MB + long maximumValue = 1000 * 24 * 60 * 60 * 1000L; // if values are ms, max is 1000 days + + try { + Random prng = new Random(seed); + int numberOfValues = 5000 + prng.nextInt(10_000); // ranges is [5000, 15000] + + Percentiles percs = new Percentiles(sizeInBytes, + maximumValue, + BucketSizing.LINEAR, + new Percentile(metrics.metricName("test.p90", "grp1"), 90), + new Percentile(metrics.metricName("test.p99", "grp1"), 99)); + MetricConfig config = new MetricConfig().eventWindow(50).samples(2); + Sensor sensor = metrics.sensor("test", config); + sensor.add(percs); + Metric p90 = this.metrics.metrics().get(metrics.metricName("test.p90", "grp1")); + Metric p99 = this.metrics.metrics().get(metrics.metricName("test.p99", "grp1")); + + final List<Long> values = new ArrayList<>(numberOfValues); + // record two windows worth of sequential values + for (int i = 0; i < numberOfValues; ++i) { + long value = Math.abs(prng.nextLong()) % maximumValue; Review comment: Not sure if it really matters, but this is not a uniform distribution (because MAX_VALUE and MIN_VALUE are not integer multiples of 1000 days. If you wanted a uniform distribution, it looks like you can use the bounded `nextInt` and cast to `long`. Also, FYI, `Math.abs(Long.MIN_VALUE) == Long.MIN_VALUE` (which is a negative number), due to overflow. ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ########## @@ -899,6 +920,28 @@ public boolean maybePunctuateSystemTime() { return punctuated; } + void maybeRecordE2ELatency(final long recordTimestamp, final String nodeName) { + maybeRecordE2ELatency(recordTimestamp, time.milliseconds(), nodeName); + } + + private void maybeRecordE2ELatency(final long recordTimestamp, final long now, final String nodeName) { + final Sensor e2eLatencySensor = e2eLatencySensors.get(nodeName); + if (e2eLatencySensor == null) { + throw new IllegalStateException("Requested to record e2e latency but could not find sensor for node " + nodeName); + } else if (e2eLatencySensor.shouldRecord() && e2eLatencySensor.hasMetrics()) { + final long e2eLatency = now - recordTimestamp; + if (e2eLatency > MAXIMUM_E2E_LATENCY) { Review comment: Meta-review procedural question: In the future, can we try to avoid making the same comment in multiple places in the PR, since it leads to split discussions like this one? ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ########## @@ -586,6 +606,7 @@ public boolean process(final long wallClockTime) { log.trace("Start processing one record [{}]", record); updateProcessorContext(record, currNode, wallClockTime); + maybeRecordE2ELatency(record.timestamp, wallClockTime, currNode.name()); Review comment: If I understand this right, we are recording sink latencies after processing, but source latencies before processing. This nicely avoids the problem with recording non-sink latencies after processing, but is it accurate? ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ########## @@ -899,6 +920,28 @@ public boolean maybePunctuateSystemTime() { return punctuated; } + void maybeRecordE2ELatency(final long recordTimestamp, final String nodeName) { + maybeRecordE2ELatency(recordTimestamp, time.milliseconds(), nodeName); + } + + private void maybeRecordE2ELatency(final long recordTimestamp, final long now, final String nodeName) { + final Sensor e2eLatencySensor = e2eLatencySensors.get(nodeName); + if (e2eLatencySensor == null) { + throw new IllegalStateException("Requested to record e2e latency but could not find sensor for node " + nodeName); + } else if (e2eLatencySensor.shouldRecord() && e2eLatencySensor.hasMetrics()) { + final long e2eLatency = now - recordTimestamp; + if (e2eLatency > MAXIMUM_E2E_LATENCY) { + log.warn("Skipped recording e2e latency for node {} because {} is higher than maximum allowed latency {}", + nodeName, e2eLatency, MAXIMUM_E2E_LATENCY); + } else if (e2eLatency < MINIMUM_E2E_LATENCY) { Review comment: I'm fine with this as well, although I think it makes more sense either to pin to zero and warn or to just record the negative latency and warn. It feels like we're overthinking it. If the clocks are drifting a little and we report small negative numbers, the e2e latency is still _low_, which is still meaningful information. I really don't see a problem with just naively reporting it and not even bothering with a warning. ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java ########## @@ -149,6 +154,10 @@ public int hashCode() { public static final String RATE_DESCRIPTION_PREFIX = "The average number of "; public static final String RATE_DESCRIPTION_SUFFIX = " per second"; + public static final int PERCENTILES_SIZE_IN_BYTES = 1000 * 1000; // 1 MB + public static long MAXIMUM_E2E_LATENCY = 10 * 24 * 60 * 60 * 1000L; // maximum latency is 10 days Review comment: However, I *do not* think we should restrict the max value for other metrics than the percentiles one. E.g., there's no reason to restrict the value we record for the max and min metrics. You should be able to update the Percentiles implementation to apply the maximum bound in the metric record method. Otherwise, I'd recommend recording two sensors separately; one for the bounded metrics, and one for the unbounded ones. ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ########## @@ -586,6 +606,7 @@ public boolean process(final long wallClockTime) { log.trace("Start processing one record [{}]", record); updateProcessorContext(record, currNode, wallClockTime); + maybeRecordE2ELatency(record.timestamp, wallClockTime, currNode.name()); Review comment: Thanks, @ableegoldman , I think this is a fine tradeoff. Also helping is the fact that we know all "source nodes" are actually instances of SourceNode, which specifically do nothing except forward every record, so whether we measure these nodes before or "after" their processing logic should make no practical difference at all. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org