vvcephei commented on a change in pull request #8697:
URL: https://github.com/apache/kafka/pull/8697#discussion_r430575822



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
##########
@@ -149,6 +154,10 @@ public int hashCode() {
     public static final String RATE_DESCRIPTION_PREFIX = "The average number 
of ";
     public static final String RATE_DESCRIPTION_SUFFIX = " per second";
 
+    public static final int PERCENTILES_SIZE_IN_BYTES = 1000 * 1000;    // 1 MB
+    public static long MAXIMUM_E2E_LATENCY = 10 * 24 * 60 * 60 * 1000L; // 
maximum latency is 10 days

Review comment:
       This necessity makes me think that our Percentiles metric algorithm 
needs to be improved. Admittedly, I haven't looked at the math, but it seems 
like it should be possible to be more adaptive.
   
   I'm in favor of not adding a config and just leaving it alone for now, so 
that we can take the option in the future to fix the problem by fixing the 
algorithm. 

##########
File path: 
clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
##########
@@ -492,6 +493,52 @@ public void testPercentiles() {
         assertEquals(75, (Double) p75.metricValue(), 1.0);
     }
 
+    @Test
+    public void testPercentilesWithRandomNumbersAndLinearBucketing() {
+        long seed = new Random().nextLong();
+        int sizeInBytes = 1000 * 1000;   // 1MB
+        long maximumValue = 1000 * 24 * 60 * 60 * 1000L; // if values are ms, 
max is 1000 days
+
+        try {
+            Random prng = new Random(seed);
+            int numberOfValues = 5000 + prng.nextInt(10_000);  // ranges is 
[5000, 15000]
+
+            Percentiles percs = new Percentiles(sizeInBytes,
+                                                maximumValue,
+                                                BucketSizing.LINEAR,
+                                                new 
Percentile(metrics.metricName("test.p90", "grp1"), 90),
+                                                new 
Percentile(metrics.metricName("test.p99", "grp1"), 99));
+            MetricConfig config = new 
MetricConfig().eventWindow(50).samples(2);
+            Sensor sensor = metrics.sensor("test", config);
+            sensor.add(percs);
+            Metric p90 = 
this.metrics.metrics().get(metrics.metricName("test.p90", "grp1"));
+            Metric p99 = 
this.metrics.metrics().get(metrics.metricName("test.p99", "grp1"));
+
+            final List<Long> values = new ArrayList<>(numberOfValues);
+            // record two windows worth of sequential values
+            for (int i = 0; i < numberOfValues; ++i) {
+                long value = Math.abs(prng.nextLong()) % maximumValue;

Review comment:
       Not sure if it really matters, but this is not a uniform distribution 
(because MAX_VALUE and MIN_VALUE are not integer multiples of 1000 days. If you 
wanted a uniform distribution, it looks like you can use the bounded `nextInt` 
and cast to `long`.
   
   Also, FYI, `Math.abs(Long.MIN_VALUE) == Long.MIN_VALUE` (which is a negative 
number), due to overflow.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -899,6 +920,28 @@ public boolean maybePunctuateSystemTime() {
         return punctuated;
     }
 
+    void maybeRecordE2ELatency(final long recordTimestamp, final String 
nodeName) {
+        maybeRecordE2ELatency(recordTimestamp, time.milliseconds(), nodeName);
+    }
+
+    private void maybeRecordE2ELatency(final long recordTimestamp, final long 
now, final String nodeName) {
+        final Sensor e2eLatencySensor = e2eLatencySensors.get(nodeName);
+        if (e2eLatencySensor == null) {
+            throw new IllegalStateException("Requested to record e2e latency 
but could not find sensor for node " + nodeName);
+        } else if (e2eLatencySensor.shouldRecord() && 
e2eLatencySensor.hasMetrics()) {
+            final long e2eLatency = now - recordTimestamp;
+            if (e2eLatency >  MAXIMUM_E2E_LATENCY) {

Review comment:
       Meta-review procedural question: In the future, can we try to avoid 
making the same comment in multiple places in the PR, since it leads to split 
discussions like this one?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -586,6 +606,7 @@ public boolean process(final long wallClockTime) {
             log.trace("Start processing one record [{}]", record);
 
             updateProcessorContext(record, currNode, wallClockTime);
+            maybeRecordE2ELatency(record.timestamp, wallClockTime, 
currNode.name());

Review comment:
       If I understand this right, we are recording sink latencies after 
processing, but source latencies before processing. This nicely avoids the 
problem with recording non-sink latencies after processing, but is it accurate?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -899,6 +920,28 @@ public boolean maybePunctuateSystemTime() {
         return punctuated;
     }
 
+    void maybeRecordE2ELatency(final long recordTimestamp, final String 
nodeName) {
+        maybeRecordE2ELatency(recordTimestamp, time.milliseconds(), nodeName);
+    }
+
+    private void maybeRecordE2ELatency(final long recordTimestamp, final long 
now, final String nodeName) {
+        final Sensor e2eLatencySensor = e2eLatencySensors.get(nodeName);
+        if (e2eLatencySensor == null) {
+            throw new IllegalStateException("Requested to record e2e latency 
but could not find sensor for node " + nodeName);
+        } else if (e2eLatencySensor.shouldRecord() && 
e2eLatencySensor.hasMetrics()) {
+            final long e2eLatency = now - recordTimestamp;
+            if (e2eLatency >  MAXIMUM_E2E_LATENCY) {
+                log.warn("Skipped recording e2e latency for node {} because {} 
is higher than maximum allowed latency {}",
+                         nodeName, e2eLatency, MAXIMUM_E2E_LATENCY);
+            } else if (e2eLatency < MINIMUM_E2E_LATENCY) {

Review comment:
       I'm fine with this as well, although I think it makes more sense either 
to pin to zero and warn or to just record the negative latency and warn. It 
feels like we're overthinking it. If the clocks are drifting a little and we 
report small negative numbers, the e2e latency is still _low_, which is still 
meaningful information. I really don't see a problem with just naively 
reporting it and not even bothering with a warning.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
##########
@@ -149,6 +154,10 @@ public int hashCode() {
     public static final String RATE_DESCRIPTION_PREFIX = "The average number 
of ";
     public static final String RATE_DESCRIPTION_SUFFIX = " per second";
 
+    public static final int PERCENTILES_SIZE_IN_BYTES = 1000 * 1000;    // 1 MB
+    public static long MAXIMUM_E2E_LATENCY = 10 * 24 * 60 * 60 * 1000L; // 
maximum latency is 10 days

Review comment:
       However, I *do not* think we should restrict the max value for other 
metrics than the percentiles one. E.g., there's no reason to restrict the value 
we record for the max and min metrics. You should be able to update the 
Percentiles implementation to apply the maximum bound in the metric record 
method. Otherwise, I'd recommend recording two sensors separately; one for the 
bounded metrics, and one for the unbounded ones.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -586,6 +606,7 @@ public boolean process(final long wallClockTime) {
             log.trace("Start processing one record [{}]", record);
 
             updateProcessorContext(record, currNode, wallClockTime);
+            maybeRecordE2ELatency(record.timestamp, wallClockTime, 
currNode.name());

Review comment:
       Thanks, @ableegoldman , I think this is a fine tradeoff. Also helping is 
the fact that we know all "source nodes" are actually instances of SourceNode, 
which specifically do nothing except forward every record, so whether we 
measure these nodes before or "after" their processing logic should make no 
practical difference at all.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to