[ 
https://issues.apache.org/jira/browse/KAFKA-7660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16704001#comment-16704001
 ] 

ASF GitHub Bot commented on KAFKA-7660:
---------------------------------------

guozhangwang closed pull request #5953: KAFKA-7660: fix parentSensors memory 
leak
URL: https://github.com/apache/kafka/pull/5953
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
index 8483791e2e7..ec4f8e5e5db 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
@@ -234,9 +234,11 @@ private static Sensor 
createTaskAndNodeLatencyAndThroughputSensors(final String
             final Sensor parent = metrics.taskLevelSensor(taskName, operation, 
Sensor.RecordingLevel.DEBUG);
             addAvgMaxLatency(parent, PROCESSOR_NODE_METRICS_GROUP, taskTags, 
operation);
             addInvocationRateAndCount(parent, PROCESSOR_NODE_METRICS_GROUP, 
taskTags, operation);
+
             final Sensor sensor = metrics.nodeLevelSensor(taskName, 
processorNodeName, operation, Sensor.RecordingLevel.DEBUG, parent);
             addAvgMaxLatency(sensor, PROCESSOR_NODE_METRICS_GROUP, nodeTags, 
operation);
             addInvocationRateAndCount(sensor, PROCESSOR_NODE_METRICS_GROUP, 
nodeTags, operation);
+
             return sensor;
         }
     }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
index 8ec2711e764..dd6cc4a2185 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
@@ -115,11 +115,9 @@ public final Sensor taskLevelSensor(final String taskName,
     public final void removeAllTaskLevelSensors(final String taskName) {
         final String key = taskSensorPrefix(taskName);
         synchronized (taskLevelSensors) {
-            if (taskLevelSensors.containsKey(key)) {
-                while (!taskLevelSensors.get(key).isEmpty()) {
-                    metrics.removeSensor(taskLevelSensors.get(key).pop());
-                }
-                taskLevelSensors.remove(key);
+            final Deque<String> sensors = taskLevelSensors.remove(key);
+            while (sensors != null && !sensors.isEmpty()) {
+                metrics.removeSensor(sensors.pop());
             }
         }
     }
@@ -152,10 +150,9 @@ public Sensor nodeLevelSensor(final String taskName,
     public final void removeAllNodeLevelSensors(final String taskName, final 
String processorNodeName) {
         final String key = nodeSensorPrefix(taskName, processorNodeName);
         synchronized (nodeLevelSensors) {
-            if (nodeLevelSensors.containsKey(key)) {
-                while (!nodeLevelSensors.get(key).isEmpty()) {
-                    metrics.removeSensor(nodeLevelSensors.get(key).pop());
-                }
+            final Deque<String> sensors = nodeLevelSensors.remove(key);
+            while (sensors != null && !sensors.isEmpty()) {
+                metrics.removeSensor(sensors.pop());
             }
         }
     }
@@ -188,11 +185,9 @@ public final Sensor cacheLevelSensor(final String taskName,
     public final void removeAllCacheLevelSensors(final String taskName, final 
String cacheName) {
         final String key = cacheSensorPrefix(taskName, cacheName);
         synchronized (cacheLevelSensors) {
-            if (cacheLevelSensors.containsKey(key)) {
-                while (!cacheLevelSensors.get(key).isEmpty()) {
-                    metrics.removeSensor(cacheLevelSensors.get(key).pop());
-                }
-                cacheLevelSensors.remove(key);
+            final Deque<String> strings = cacheLevelSensors.remove(key);
+            while (strings != null && !strings.isEmpty()) {
+                metrics.removeSensor(strings.pop());
             }
         }
     }
@@ -225,11 +220,9 @@ public final Sensor storeLevelSensor(final String taskName,
     public final void removeAllStoreLevelSensors(final String taskName, final 
String storeName) {
         final String key = storeSensorPrefix(taskName, storeName);
         synchronized (storeLevelSensors) {
-            if (storeLevelSensors.containsKey(key)) {
-                while (!storeLevelSensors.get(key).isEmpty()) {
-                    metrics.removeSensor(storeLevelSensors.get(key).pop());
-                }
-                storeLevelSensors.remove(key);
+            final Deque<String> sensors = storeLevelSensors.remove(key);
+            while (sensors != null && !sensors.isEmpty()) {
+                metrics.removeSensor(sensors.pop());
             }
         }
     }
@@ -413,12 +406,19 @@ public void removeSensor(final Sensor sensor) {
         Objects.requireNonNull(sensor, "Sensor is null");
         metrics.removeSensor(sensor.name());
 
-        final Sensor parent = parentSensors.get(sensor);
+        final Sensor parent = parentSensors.remove(sensor);
         if (parent != null) {
             metrics.removeSensor(parent.name());
         }
     }
 
+    /**
+     * Visible for testing
+     */
+    Map<Sensor, Sensor> parentSensors() {
+        return Collections.unmodifiableMap(parentSensors);
+    }
+
     private static String groupNameFromScope(final String scopeName) {
         return "stream-" + scopeName + "-metrics";
     }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImplTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
similarity index 59%
rename from 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImplTest.java
rename to 
streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
index 7ce27b4b6d6..cadfdb0cef1 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImplTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.streams.processor.internals;
+package org.apache.kafka.streams.processor.internals.metrics;
 
 
 import org.apache.kafka.common.MetricName;
@@ -23,12 +23,21 @@
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.utils.MockTime;
-import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.junit.Test;
 
+import java.util.Collections;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.PROCESSOR_NODE_METRICS_GROUP;
+import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addAvgMaxLatency;
+import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCount;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThan;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
 
 public class StreamsMetricsImplTest {
 
@@ -62,6 +71,60 @@ public void testRemoveSensor() {
 
         final Sensor sensor3 = streamsMetrics.addThroughputSensor(scope, 
entity, operation, Sensor.RecordingLevel.DEBUG);
         streamsMetrics.removeSensor(sensor3);
+
+        assertEquals(Collections.emptyMap(), streamsMetrics.parentSensors());
+    }
+
+    @Test
+    public void testMutiLevelSensorRemoval() {
+        final Metrics registry = new Metrics();
+        final StreamsMetricsImpl metrics = new StreamsMetricsImpl(registry, 
"");
+        for (final MetricName defaultMetric : registry.metrics().keySet()) {
+            registry.removeMetric(defaultMetric);
+        }
+
+        final String taskName = "taskName";
+        final String operation = "operation";
+        final Map<String, String> taskTags = mkMap(mkEntry("tkey", "value"));
+
+        final String processorNodeName = "processorNodeName";
+        final Map<String, String> nodeTags = mkMap(mkEntry("nkey", "value"));
+
+        final Sensor parent1 = metrics.taskLevelSensor(taskName, operation, 
Sensor.RecordingLevel.DEBUG);
+        addAvgMaxLatency(parent1, PROCESSOR_NODE_METRICS_GROUP, taskTags, 
operation);
+        addInvocationRateAndCount(parent1, PROCESSOR_NODE_METRICS_GROUP, 
taskTags, operation);
+
+        final int numberOfTaskMetrics = registry.metrics().size();
+
+        final Sensor sensor1 = metrics.nodeLevelSensor(taskName, 
processorNodeName, operation, Sensor.RecordingLevel.DEBUG, parent1);
+        addAvgMaxLatency(sensor1, PROCESSOR_NODE_METRICS_GROUP, nodeTags, 
operation);
+        addInvocationRateAndCount(sensor1, PROCESSOR_NODE_METRICS_GROUP, 
nodeTags, operation);
+
+        assertThat(registry.metrics().size(), 
greaterThan(numberOfTaskMetrics));
+
+        metrics.removeAllNodeLevelSensors(taskName, processorNodeName);
+
+        assertThat(registry.metrics().size(), equalTo(numberOfTaskMetrics));
+
+        final Sensor parent2 = metrics.taskLevelSensor(taskName, operation, 
Sensor.RecordingLevel.DEBUG);
+        addAvgMaxLatency(parent2, PROCESSOR_NODE_METRICS_GROUP, taskTags, 
operation);
+        addInvocationRateAndCount(parent2, PROCESSOR_NODE_METRICS_GROUP, 
taskTags, operation);
+
+        assertThat(registry.metrics().size(), equalTo(numberOfTaskMetrics));
+
+        final Sensor sensor2 = metrics.nodeLevelSensor(taskName, 
processorNodeName, operation, Sensor.RecordingLevel.DEBUG, parent2);
+        addAvgMaxLatency(sensor2, PROCESSOR_NODE_METRICS_GROUP, nodeTags, 
operation);
+        addInvocationRateAndCount(sensor2, PROCESSOR_NODE_METRICS_GROUP, 
nodeTags, operation);
+
+        assertThat(registry.metrics().size(), 
greaterThan(numberOfTaskMetrics));
+
+        metrics.removeAllNodeLevelSensors(taskName, processorNodeName);
+
+        assertThat(registry.metrics().size(), equalTo(numberOfTaskMetrics));
+
+        metrics.removeAllTaskLevelSensors(taskName);
+
+        assertThat(registry.metrics().size(), equalTo(0));
     }
 
     @Test
@@ -115,21 +178,21 @@ public void testTotalMetricDoesntDecrease() {
         final String operation = "op";
 
         final Sensor sensor = streamsMetrics.addLatencyAndThroughputSensor(
-                scope,
-                entity,
-                operation,
-                Sensor.RecordingLevel.INFO
+            scope,
+            entity,
+            operation,
+            Sensor.RecordingLevel.INFO
         );
 
         final double latency = 100.0;
         final MetricName totalMetricName = metrics.metricName(
-                "op-total",
-                "stream-scope-metrics",
-                "",
-                "client-id",
-                "",
-                "scope-id",
-                "entity"
+            "op-total",
+            "stream-scope-metrics",
+            "",
+            "client-id",
+            "",
+            "scope-id",
+            "entity"
         );
 
         final KafkaMetric totalMetric = metrics.metric(totalMetricName);


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Stream Metrics - Memory Analysis
> --------------------------------
>
>                 Key: KAFKA-7660
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7660
>             Project: Kafka
>          Issue Type: Bug
>          Components: metrics, streams
>    Affects Versions: 2.0.0
>            Reporter: Patrik Kleindl
>            Priority: Minor
>         Attachments: Mem_Collections.jpeg, Mem_DuplicateStrings.jpeg, 
> Mem_DuplicateStrings2.jpeg, Mem_Hotspots.jpeg, Mem_KeepAliveSet.jpeg, 
> Mem_References.jpeg, heapdump-1543441898901.hprof
>
>
> During the analysis of JVM memory two possible issues were shown which I 
> would like to bring to your attention:
> 1) Duplicate strings
> Top findings: 
> string_content="stream-processor-node-metrics" count="534,277"
> string_content="processor-node-id" count="148,437"
> string_content="stream-rocksdb-state-metrics" count="41,832"
> string_content="punctuate-latency-avg" count="29,681" 
>  
> "stream-processor-node-metrics"  seems to be used in Sensors.java as a 
> literal and not interned.
>  
> 2) The HashMap parentSensors from 
> org.apache.kafka.streams.processor.internals.StreamThread$StreamsMetricsThreadImpl
>  was reported multiple times as suspicious for potentially keeping alive a 
> lot of objects. In our case the reported size was 40-50MB each.
> I haven't looked too deep in the code but noticed that the class Sensor.java 
> which is used as a key in the HashMap does not implement equals or hashCode 
> method. Not sure this is a problem though.
>  
> The analysis was done with Dynatrace 7.0
> We are running Confluent 5.0/Kafka2.0-cp1 (Brokers as well as Clients)
>  
> Screenshots are attached.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to