ableegoldman commented on a change in pull request #9094: URL: https://github.com/apache/kafka/pull/9094#discussion_r469582284
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ProcessorNodeMetrics.java ########## @@ -289,6 +294,25 @@ public static Sensor processorAtSourceSensorOrForwardSensor(final String threadI return processAtSourceSensor(threadId, taskId, processorNodeId, streamsMetrics); } + public static Sensor e2ELatencySensor(final String threadId, + final String taskId, + final String processorNodeId, + final StreamsMetricsImpl streamsMetrics) { + final String sensorName = processorNodeId + "-" + RECORD_E2E_LATENCY; + final Sensor sensor = streamsMetrics.taskLevelSensor(threadId, taskId, sensorName, RecordingLevel.INFO); Review comment: There's a slight hiccup with moving the INFO metrics from task to node level: We get the current sensor from `StreamsMetrics#taskLevelSensor` which computes the `fullSensorName` with the `#taskSensorPrefix` If we instead use `StreamsMetrics#nodeLevelSensor` then the` fullSensorName` is constructed from the `#nodeSensorPrefix`, which is obviously different. So moving this to a “true” node level sensor would be a breaking change, IIUC I think the best we can do is just move this from TaskMetrics to ProcessorNodeMetrics, but still leave it as a taskLevelSensor. Let me know if I'm missing something though ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org