cadonna commented on a change in pull request #8697: URL: https://github.com/apache/kafka/pull/8697#discussion_r429158357
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java ########## @@ -642,6 +642,30 @@ public static void addAvgAndMaxToSensor(final Sensor sensor, ); } + public static void addMaxAndMinToSensor(final Sensor sensor, Review comment: prop (super-nit): Could you call the method `addMinAndMaxToSensor()` since we have already one method that is called `addAvgAndMinAndMaxToSensor()`? ########## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/TaskMetricsTest.java ########## @@ -356,4 +354,27 @@ public void shouldGetDroppedRecordsSensorOrLateRecordDropSensor() { shouldGetDroppedRecordsSensor(); } } + + @Test + public void shouldGetRecordE2ELatencySensor() { + final String operation = "record-e2e-latency"; + expect(streamsMetrics.taskLevelSensor(THREAD_ID, TASK_ID, operation, RecordingLevel.INFO)) + .andReturn(expectedSensor); + expect(streamsMetrics.taskLevelTagMap(THREAD_ID, TASK_ID)).andReturn(tagMap); + StreamsMetricsImpl.addMaxAndMinToSensor( + expectedSensor, + TASK_LEVEL_GROUP, + tagMap, + operation, + RECORD_E2E_LATENCY_MAX_DESCRIPTION, Review comment: req: Please do not use the constant here. The point of this test is also to check the correctness of the description, i.e., the content of that constant. ########## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/TaskMetricsTest.java ########## @@ -14,15 +14,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.kafka.streams.kstream.internals.metrics; +package org.apache.kafka.streams.processor.internals.metrics; Review comment: Yes, very nice! ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/TaskMetrics.java ########## @@ -86,6 +87,14 @@ private TaskMetrics() {} private static final String NUM_BUFFERED_RECORDS_DESCRIPTION = "The count of buffered records that are polled " + "from consumer and not yet processed for this active task"; + private static final String RECORD_E2E_LATENCY = "record-e2e-latency"; + static final String RECORD_E2E_LATENCY_MAX_DESCRIPTION = + "The maximum end-to-end latency of a record, measuring by comparing the record timestamp with the " + + "system time when it has been fully processed by the task"; + static final String RECORD_E2E_LATENCY_MIN_DESCRIPTION = Review comment: See my comment above. ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/TaskMetrics.java ########## @@ -86,6 +87,14 @@ private TaskMetrics() {} private static final String NUM_BUFFERED_RECORDS_DESCRIPTION = "The count of buffered records that are polled " + "from consumer and not yet processed for this active task"; + private static final String RECORD_E2E_LATENCY = "record-e2e-latency"; + static final String RECORD_E2E_LATENCY_MAX_DESCRIPTION = Review comment: req: Please define this constant as private as all the others. I left a request in `TaskMetricsTest` which makes this request clearer. ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ########## @@ -137,6 +138,7 @@ public StreamTask(final TaskId id, } processRatioSensor = TaskMetrics.activeProcessRatioSensor(threadId, taskId, streamsMetrics); processLatencySensor = TaskMetrics.processLatencySensor(threadId, taskId, streamsMetrics); + recordE2ELatencySensor = TaskMetrics.recordE2ELatencySensor(threadId, taskId, streamsMetrics); Review comment: Q: Is there a specific reason to init the sensor here and not in `SinkNode`? You can init and store it there. That was one motivation to make `*Metrics` classes (e.g. `TaskMetrics`) static, so that you do not need any code in the processor context to get specific sensors. If there is not specific reason, you could get rid of the changes in the `*Context*` classes. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org