vvcephei commented on a change in pull request #8882:
URL: https://github.com/apache/kafka/pull/8882#discussion_r441118241



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ProcessorNodeMetrics.java
##########
@@ -337,7 +307,7 @@ private static Sensor 
throughputAndLatencySensorWithParent(final String threadId
             descriptionOfCount,
             descriptionOfAvgLatency,
             descriptionOfMaxLatency,
-            RecordingLevel.DEBUG,
+            recordingLevel,

Review comment:
       We erroneously ignored the provided recordingLevel and set them to 
debug. It didn't manifest because this method happens to always be called with 
a recordingLevel of debug anyway.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
##########
@@ -160,18 +162,21 @@ public void postCommit() {
 
     @Override
     public void closeClean() {
+        
streamsMetrics.removeAllTaskLevelSensors(Thread.currentThread().getName(), 
id.toString());

Review comment:
       Standby tasks don't currently register any sensors, but I personally 
rather to be defensive and idempotently ensure we remove any sensors while 
closing.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
##########
@@ -235,7 +235,7 @@ public StateStore getStateStore(final String name) {
         setCurrentNode(child);
         child.process(key, value);
         if (child.isTerminalNode()) {
-            streamTask.maybeRecordE2ELatency(timestamp(), child.name());
+            streamTask.maybeRecordE2ELatency(timestamp(), 
currentSystemTimeMs(), child.name());

Review comment:
       This previously relied on a lookup of the actual current system time. I 
thought we decided to use the cached system time. Can you set me straight, 
@ableegoldman ?

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
##########
@@ -400,20 +402,20 @@ public void shouldRecordBufferedRecords() {
 
         final KafkaMetric metric = getMetric("active-buffer", "%s-count", 
task.id().toString(), StreamsConfig.METRICS_LATEST);
 
-        assertThat(metric.metricValue(), equalTo(0.0d));
+        assertThat(metric.metricValue(), equalTo(0.0));

Review comment:
       Just cleaning up some oddball literals.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
##########
@@ -454,24 +456,7 @@ public void 
shouldRecordE2ELatencyOnProcessForSourceNodes() {
         task.addRecords(partition1, 
singletonList(getConsumerRecord(partition1, 0L)));
         task.process(100L);
 
-        assertThat(maxMetric.metricValue(), equalTo(100d));
-    }
-
-    @Test
-    public void shouldRecordE2ELatencyOnProcessForTerminalNodes() {
-        time = new MockTime(0L, 0L, 0L);
-        metrics = new Metrics(new 
MetricConfig().recordLevel(Sensor.RecordingLevel.INFO), time);
-        task = createStatelessTask(createConfig(false, "0"), 
StreamsConfig.METRICS_LATEST);
-
-        final String terminalNode = processorStreamTime.name();
-
-        final Metric maxMetric = getProcessorMetric("record-e2e-latency", 
"%s-max", task.id().toString(), terminalNode, StreamsConfig.METRICS_LATEST);
-
-        // e2e latency = 100
-        time.setCurrentTimeMs(100L);
-        task.maybeRecordE2ELatency(0L, terminalNode);

Review comment:
       This test wasn't really testing the "terminal node" code path in 
ProcessorContextImpl, just that this overload actually fetches the current 
system time. Since I removed the overload, we don't need the test.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
##########
@@ -675,27 +670,6 @@ public static void addMinAndMaxAndP99AndP90ToSensor(final 
Sensor sensor,
                 tags),
             new Max()
         );
-
-        sensor.add(

Review comment:
       Dropped the percentiles metric.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -150,18 +151,18 @@ public StreamTask(final TaskId id,
         punctuateLatencySensor = TaskMetrics.punctuateSensor(threadId, taskId, 
streamsMetrics);
         bufferedRecordsSensor = 
TaskMetrics.activeBufferedRecordsSensor(threadId, taskId, streamsMetrics);
 
-        for (final String terminalNode : topology.terminalNodes()) {
+        for (final String terminalNodeName : topology.terminalNodes()) {
             e2eLatencySensors.put(
-                terminalNode,
-                ProcessorNodeMetrics.recordE2ELatencySensor(threadId, taskId, 
terminalNode, RecordingLevel.INFO, streamsMetrics)
+                terminalNodeName,
+                TaskMetrics.e2ELatencySensor(threadId, taskId, 
terminalNodeName, RecordingLevel.INFO, streamsMetrics)

Review comment:
       Fixes the sensor leak by simply registering these as task-level sensors. 
Note the node name is still provided to scope the sensors themselves.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -456,12 +457,14 @@ public void postCommit() {
 
     @Override
     public void closeClean() {
+        
streamsMetrics.removeAllTaskLevelSensors(Thread.currentThread().getName(), 
id.toString());

Review comment:
       We previously relied on the task manager to remove these sensors before 
calling close, but forgot to do it before recycling. In retrospect, it's better 
to do it within the same class that creates the sensors to begin with.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
##########
@@ -1806,11 +1793,19 @@ public void shouldRecycleTask() {
         task.initializeIfNeeded();
         task.completeRestoration();
 
+        assertThat(getTaskMetrics(), not(empty()));
+
         task.closeAndRecycleState();
 
+        assertThat(getTaskMetrics(), empty());

Review comment:
       Verified this fails on trunk.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to