[ 
https://issues.apache.org/jira/browse/KAFKA-7660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16710843#comment-16710843
 ] 

ASF GitHub Bot commented on KAFKA-7660:
---------------------------------------

mjsax closed pull request #5984: KAFKA-7660: fix streams and Metrics memory 
leaks
URL: https://github.com/apache/kafka/pull/5984
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java 
b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
index 512c18e74ac..874c172acf5 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
@@ -367,6 +367,12 @@ public void removeSensor(String name) {
                             removeMetric(metric.metricName());
                         log.debug("Removed sensor with name {}", name);
                         childSensors = childrenSensors.remove(sensor);
+                        for (final Sensor parent : sensor.parents()) {
+                            final List<Sensor> peers = 
childrenSensors.get(parent);
+                            if (peers != null) {
+                                peers.remove(sensor);
+                            }
+                        }
                     }
                 }
             }
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java 
b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
index 4a9b488d1c5..33829f9f5a0 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
@@ -18,13 +18,15 @@
 import org.apache.kafka.common.utils.Utils;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Locale;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
+import static java.util.Arrays.asList;
+import static java.util.Collections.unmodifiableList;
+
 /**
  * A sensor applies a continuous sequence of numerical values to a set of 
associated metrics. For example a sensor on
  * message size would record a sequence of message sizes using the {@link 
#record(double)} api and would maintain a set
@@ -128,6 +130,10 @@ public String name() {
         return this.name;
     }
 
+    List<Sensor> parents() {
+        return unmodifiableList(asList(parents));
+    }
+
     /**
      * Record an occurrence, this is just short-hand for {@link 
#record(double) record(1.0)}
      */
@@ -260,6 +266,6 @@ public boolean hasExpired() {
     }
 
     synchronized List<KafkaMetric> metrics() {
-        return Collections.unmodifiableList(this.metrics);
+        return unmodifiableList(this.metrics);
     }
 }
diff --git 
a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java 
b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
index 5797b369758..5ee79de81f2 100644
--- a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
@@ -12,6 +12,8 @@
  */
 package org.apache.kafka.common.metrics;
 
+import static java.util.Collections.emptyList;
+import static java.util.Collections.singletonList;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -169,6 +171,20 @@ public void testBadSensorHierarchy() {
         metrics.sensor("gc", c1, c2); // should fail
     }
 
+    @Test
+    public void testRemoveChildSensor() {
+        final Metrics metrics = new Metrics();
+
+        final Sensor parent = metrics.sensor("parent");
+        final Sensor child = metrics.sensor("child", parent);
+
+        assertEquals(singletonList(child), 
metrics.childrenSensors().get(parent));
+
+        metrics.removeSensor("child");
+
+        assertEquals(emptyList(), metrics.childrenSensors().get(parent));
+    }
+
     @Test
     public void testRemoveSensor() {
         int size = metrics.metrics().size();
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java
index bccf736d0a8..93748826720 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java
@@ -196,11 +196,11 @@ public void measureLatencyNs(final Time time, final 
Runnable action, final Senso
      */
     @Override
     public void removeSensor(Sensor sensor) {
-        Sensor parent = null;
         Objects.requireNonNull(sensor, "Sensor is null");
 
         metrics.removeSensor(sensor.name());
-        parent = parentSensors.get(sensor);
+
+        final Sensor parent = parentSensors.remove(sensor);
         if (parent != null) {
             metrics.removeSensor(parent.name());
         }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImplTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImplTest.java
index c6bc2508f2d..42b48ecc87a 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImplTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImplTest.java
@@ -19,13 +19,17 @@
 
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.KafkaMetric;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
 import org.junit.Test;
 
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.Map;
 
+import static java.util.Collections.emptyMap;
+import static java.util.Collections.unmodifiableMap;
 import static org.junit.Assert.assertEquals;
 
 public class StreamsMetricsImplTest {
@@ -53,19 +57,27 @@ public void testRemoveSensor() {
         String entity = "entity";
         String operation = "put";
         Map<String, String> tags = new HashMap<>();
-        StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(new 
Metrics(), groupName, tags);
+        final Metrics metrics = new Metrics();
+        final Map<MetricName, KafkaMetric> initialMetrics = 
unmodifiableMap(new LinkedHashMap<>(metrics.metrics()));
+        StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, 
groupName, tags);
 
         Sensor sensor1 = streamsMetrics.addSensor(sensorName, 
Sensor.RecordingLevel.DEBUG);
         streamsMetrics.removeSensor(sensor1);
+        assertEquals(initialMetrics, metrics.metrics());
 
         Sensor sensor1a = streamsMetrics.addSensor(sensorName, 
Sensor.RecordingLevel.DEBUG, sensor1);
         streamsMetrics.removeSensor(sensor1a);
+        assertEquals(initialMetrics, metrics.metrics());
 
         Sensor sensor2 = streamsMetrics.addLatencyAndThroughputSensor(scope, 
entity, operation, Sensor.RecordingLevel.DEBUG);
         streamsMetrics.removeSensor(sensor2);
+        assertEquals(initialMetrics, metrics.metrics());
 
         Sensor sensor3 = streamsMetrics.addThroughputSensor(scope, entity, 
operation, Sensor.RecordingLevel.DEBUG);
         streamsMetrics.removeSensor(sensor3);
+        assertEquals(initialMetrics, metrics.metrics());
+
+        assertEquals(emptyMap(), streamsMetrics.parentSensors);
     }
 
     @Test


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Stream Metrics - Memory Analysis
> --------------------------------
>
>                 Key: KAFKA-7660
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7660
>             Project: Kafka
>          Issue Type: Bug
>          Components: metrics, streams
>    Affects Versions: 2.0.0
>            Reporter: Patrik Kleindl
>            Assignee: John Roesler
>            Priority: Minor
>             Fix For: 0.10.0.2, 0.11.0.4, 1.0.3, 1.1.2, 2.2.0, 2.1.1, 2.0.2
>
>         Attachments: Mem_Collections.jpeg, Mem_DuplicateStrings.jpeg, 
> Mem_DuplicateStrings2.jpeg, Mem_Hotspots.jpeg, Mem_KeepAliveSet.jpeg, 
> Mem_References.jpeg, heapdump-1543441898901.hprof
>
>
> During the analysis of JVM memory two possible issues were shown which I 
> would like to bring to your attention:
> 1) Duplicate strings
> Top findings: 
> string_content="stream-processor-node-metrics" count="534,277"
> string_content="processor-node-id" count="148,437"
> string_content="stream-rocksdb-state-metrics" count="41,832"
> string_content="punctuate-latency-avg" count="29,681" 
>  
> "stream-processor-node-metrics"  seems to be used in Sensors.java as a 
> literal and not interned.
>  
> 2) The HashMap parentSensors from 
> org.apache.kafka.streams.processor.internals.StreamThread$StreamsMetricsThreadImpl
>  was reported multiple times as suspicious for potentially keeping alive a 
> lot of objects. In our case the reported size was 40-50MB each.
> I haven't looked too deep in the code but noticed that the class Sensor.java 
> which is used as a key in the HashMap does not implement equals or hashCode 
> method. Not sure this is a problem though.
>  
> The analysis was done with Dynatrace 7.0
> We are running Confluent 5.0/Kafka2.0-cp1 (Brokers as well as Clients)
>  
> Screenshots are attached.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to