mjsax commented on code in PR #22452:
URL: https://github.com/apache/kafka/pull/22452#discussion_r3365741274


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java:
##########
@@ -1289,6 +1293,22 @@ void maybeRecordE2ELatency(final long recordTimestamp, 
final long now, final Str
         }
     }
 
+    /**
+     * Records e2e latency for a terminal node using the wall-clock time at 
which the record was
+     * fully processed by the topology.
+     */
+    void maybeRecordE2ELatency(final long recordTimestamp, final String 
nodeName) {
+        final Sensor e2eLatencySensor = e2eLatencySensors.get(nodeName);
+        if (e2eLatencySensor == null) {
+            throw new IllegalStateException("Requested to record e2e latency 
but could not find sensor for node " + nodeName);
+        } else if (e2eLatencySensor.shouldRecord() && 
e2eLatencySensor.hasMetrics()) {
+            if (currentRecordE2ELatencyTimeMs < 0) {
+                currentRecordE2ELatencyTimeMs = time.milliseconds();

Review Comment:
   Same concern as on the other PR for this ticket:
   
   https://github.com/apache/kafka/pull/21618#pullrequestreview-4064367259



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to