mjsax commented on a change in pull request #8697: URL: https://github.com/apache/kafka/pull/8697#discussion_r429499887
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ########## @@ -899,6 +920,28 @@ public boolean maybePunctuateSystemTime() { return punctuated; } + void maybeRecordE2ELatency(final long recordTimestamp, final String nodeName) { + maybeRecordE2ELatency(recordTimestamp, time.milliseconds(), nodeName); + } + + private void maybeRecordE2ELatency(final long recordTimestamp, final long now, final String nodeName) { + final Sensor e2eLatencySensor = e2eLatencySensors.get(nodeName); + if (e2eLatencySensor == null) { + throw new IllegalStateException("Requested to record e2e latency but could not find sensor for node " + nodeName); + } else if (e2eLatencySensor.shouldRecord() && e2eLatencySensor.hasMetrics()) { + final long e2eLatency = now - recordTimestamp; + if (e2eLatency > MAXIMUM_E2E_LATENCY) { + log.warn("Skipped recording e2e latency for node {} because {} is higher than maximum allowed latency {}", + nodeName, e2eLatency, MAXIMUM_E2E_LATENCY); + } else if (e2eLatency < MINIMUM_E2E_LATENCY) { Review comment: For this case, should we record "zero" instead? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org