[ 
https://issues.apache.org/jira/browse/KAFKA-7660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16709296#comment-16709296
 ] 

ASF GitHub Bot commented on KAFKA-7660:
---------------------------------------

mjsax closed pull request #5979: KAFKA-7660: fix streams and Metrics memory 
leaks
URL: https://github.com/apache/kafka/pull/5979
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java 
b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
index a6da9f90397..9e2b6f18f0c 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
@@ -41,6 +41,8 @@
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 
+import static java.util.Collections.emptyList;
+
 /**
  * A registry of sensors and metrics.
  * <p>
@@ -446,6 +448,9 @@ public void removeSensor(String name) {
                             removeMetric(metric.metricName());
                         log.debug("Removed sensor with name {}", name);
                         childSensors = childrenSensors.remove(sensor);
+                        for (final Sensor parent : sensor.parents()) {
+                            childrenSensors.getOrDefault(parent, 
emptyList()).remove(sensor);
+                        }
                     }
                 }
             }
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java 
b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
index ccbe8aad9cd..1af9419bc75 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
@@ -22,7 +22,6 @@
 import org.apache.kafka.common.utils.Utils;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.LinkedList;
@@ -32,6 +31,9 @@
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
+import static java.util.Arrays.asList;
+import static java.util.Collections.unmodifiableList;
+
 /**
  * A sensor applies a continuous sequence of numerical values to a set of 
associated metrics. For example a sensor on
  * message size would record a sequence of message sizes using the {@link 
#record(double)} api and would maintain a set
@@ -133,6 +135,10 @@ public String name() {
         return this.name;
     }
 
+    List<Sensor> parents() {
+        return unmodifiableList(asList(parents));
+    }
+
     /**
      * Record an occurrence, this is just short-hand for {@link 
#record(double) record(1.0)}
      */
@@ -291,7 +297,7 @@ public boolean hasExpired() {
     }
 
     synchronized List<KafkaMetric> metrics() {
-        return Collections.unmodifiableList(new 
LinkedList<>(this.metrics.values()));
+        return unmodifiableList(new LinkedList<>(this.metrics.values()));
     }
 
     /**
diff --git 
a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java 
b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
index eb3f7752a6c..992abcba7d6 100644
--- a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
@@ -16,6 +16,8 @@
  */
 package org.apache.kafka.common.metrics;
 
+import static java.util.Collections.emptyList;
+import static java.util.Collections.singletonList;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -202,6 +204,20 @@ public void testBadSensorHierarchy() {
         metrics.sensor("gc", c1, c2); // should fail
     }
 
+    @Test
+    public void testRemoveChildSensor() {
+        final Metrics metrics = new Metrics();
+
+        final Sensor parent = metrics.sensor("parent");
+        final Sensor child = metrics.sensor("child", parent);
+
+        assertEquals(singletonList(child), 
metrics.childrenSensors().get(parent));
+
+        metrics.removeSensor("child");
+
+        assertEquals(emptyList(), metrics.childrenSensors().get(parent));
+    }
+
     @Test
     public void testRemoveSensor() {
         int size = metrics.metrics().size();
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
index 8dc64173379..ec4f8e5e5db 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
@@ -31,6 +31,8 @@
 import java.util.Map;
 import java.util.Set;
 
+import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.PROCESSOR_NODE_ID_TAG;
+import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.PROCESSOR_NODE_METRICS_GROUP;
 import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addAvgMaxLatency;
 import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCount;
 
@@ -165,15 +167,13 @@ Sensor sourceNodeForwardSensor() {
         private NodeMetrics(final StreamsMetricsImpl metrics, final String 
processorNodeName, final ProcessorContext context) {
             this.metrics = metrics;
 
-            final String group = "stream-processor-node-metrics";
             final String taskName = context.taskId().toString();
-            final Map<String, String> tagMap = metrics.tagMap("task-id", 
context.taskId().toString(), "processor-node-id", processorNodeName);
-            final Map<String, String> allTagMap = metrics.tagMap("task-id", 
context.taskId().toString(), "processor-node-id", "all");
+            final Map<String, String> tagMap = metrics.tagMap("task-id", 
context.taskId().toString(), PROCESSOR_NODE_ID_TAG, processorNodeName);
+            final Map<String, String> allTagMap = metrics.tagMap("task-id", 
context.taskId().toString(), PROCESSOR_NODE_ID_TAG, "all");
 
             nodeProcessTimeSensor = 
createTaskAndNodeLatencyAndThroughputSensors(
                 "process",
                 metrics,
-                group,
                 taskName,
                 processorNodeName,
                 allTagMap,
@@ -183,7 +183,6 @@ private NodeMetrics(final StreamsMetricsImpl metrics, final 
String processorNode
             nodePunctuateTimeSensor = 
createTaskAndNodeLatencyAndThroughputSensors(
                 "punctuate",
                 metrics,
-                group,
                 taskName,
                 processorNodeName,
                 allTagMap,
@@ -193,7 +192,6 @@ private NodeMetrics(final StreamsMetricsImpl metrics, final 
String processorNode
             nodeCreationSensor = createTaskAndNodeLatencyAndThroughputSensors(
                 "create",
                 metrics,
-                group,
                 taskName,
                 processorNodeName,
                 allTagMap,
@@ -204,7 +202,6 @@ private NodeMetrics(final StreamsMetricsImpl metrics, final 
String processorNode
             nodeDestructionSensor = 
createTaskAndNodeLatencyAndThroughputSensors(
                 "destroy",
                 metrics,
-                group,
                 taskName,
                 processorNodeName,
                 allTagMap,
@@ -214,7 +211,6 @@ private NodeMetrics(final StreamsMetricsImpl metrics, final 
String processorNode
             sourceNodeForwardSensor = 
createTaskAndNodeLatencyAndThroughputSensors(
                 "forward",
                 metrics,
-                group,
                 taskName,
                 processorNodeName,
                 allTagMap,
@@ -231,17 +227,18 @@ private void removeAllSensors() {
 
         private static Sensor 
createTaskAndNodeLatencyAndThroughputSensors(final String operation,
                                                                            
final StreamsMetricsImpl metrics,
-                                                                           
final String group,
                                                                            
final String taskName,
                                                                            
final String processorNodeName,
                                                                            
final Map<String, String> taskTags,
                                                                            
final Map<String, String> nodeTags) {
             final Sensor parent = metrics.taskLevelSensor(taskName, operation, 
Sensor.RecordingLevel.DEBUG);
-            addAvgMaxLatency(parent, group, taskTags, operation);
-            addInvocationRateAndCount(parent, group, taskTags, operation);
+            addAvgMaxLatency(parent, PROCESSOR_NODE_METRICS_GROUP, taskTags, 
operation);
+            addInvocationRateAndCount(parent, PROCESSOR_NODE_METRICS_GROUP, 
taskTags, operation);
+
             final Sensor sensor = metrics.nodeLevelSensor(taskName, 
processorNodeName, operation, Sensor.RecordingLevel.DEBUG, parent);
-            addAvgMaxLatency(sensor, group, nodeTags, operation);
-            addInvocationRateAndCount(sensor, group, nodeTags, operation);
+            addAvgMaxLatency(sensor, PROCESSOR_NODE_METRICS_GROUP, nodeTags, 
operation);
+            addInvocationRateAndCount(sensor, PROCESSOR_NODE_METRICS_GROUP, 
nodeTags, operation);
+
             return sensor;
         }
     }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
index 170311238ec..dd6cc4a2185 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
@@ -51,6 +51,9 @@
     private static final String SENSOR_PREFIX_DELIMITER = ".";
     private static final String SENSOR_NAME_DELIMITER = ".s.";
 
+    public static final String PROCESSOR_NODE_METRICS_GROUP = 
"stream-processor-node-metrics";
+    public static final String PROCESSOR_NODE_ID_TAG = "processor-node-id";
+
     public StreamsMetricsImpl(final Metrics metrics, final String threadName) {
         Objects.requireNonNull(metrics, "Metrics cannot be null");
         this.threadName = threadName;
@@ -112,11 +115,9 @@ public final Sensor taskLevelSensor(final String taskName,
     public final void removeAllTaskLevelSensors(final String taskName) {
         final String key = taskSensorPrefix(taskName);
         synchronized (taskLevelSensors) {
-            if (taskLevelSensors.containsKey(key)) {
-                while (!taskLevelSensors.get(key).isEmpty()) {
-                    metrics.removeSensor(taskLevelSensors.get(key).pop());
-                }
-                taskLevelSensors.remove(key);
+            final Deque<String> sensors = taskLevelSensors.remove(key);
+            while (sensors != null && !sensors.isEmpty()) {
+                metrics.removeSensor(sensors.pop());
             }
         }
     }
@@ -149,10 +150,9 @@ public Sensor nodeLevelSensor(final String taskName,
     public final void removeAllNodeLevelSensors(final String taskName, final 
String processorNodeName) {
         final String key = nodeSensorPrefix(taskName, processorNodeName);
         synchronized (nodeLevelSensors) {
-            if (nodeLevelSensors.containsKey(key)) {
-                while (!nodeLevelSensors.get(key).isEmpty()) {
-                    metrics.removeSensor(nodeLevelSensors.get(key).pop());
-                }
+            final Deque<String> sensors = nodeLevelSensors.remove(key);
+            while (sensors != null && !sensors.isEmpty()) {
+                metrics.removeSensor(sensors.pop());
             }
         }
     }
@@ -185,11 +185,9 @@ public final Sensor cacheLevelSensor(final String taskName,
     public final void removeAllCacheLevelSensors(final String taskName, final 
String cacheName) {
         final String key = cacheSensorPrefix(taskName, cacheName);
         synchronized (cacheLevelSensors) {
-            if (cacheLevelSensors.containsKey(key)) {
-                while (!cacheLevelSensors.get(key).isEmpty()) {
-                    metrics.removeSensor(cacheLevelSensors.get(key).pop());
-                }
-                cacheLevelSensors.remove(key);
+            final Deque<String> strings = cacheLevelSensors.remove(key);
+            while (strings != null && !strings.isEmpty()) {
+                metrics.removeSensor(strings.pop());
             }
         }
     }
@@ -222,11 +220,9 @@ public final Sensor storeLevelSensor(final String taskName,
     public final void removeAllStoreLevelSensors(final String taskName, final 
String storeName) {
         final String key = storeSensorPrefix(taskName, storeName);
         synchronized (storeLevelSensors) {
-            if (storeLevelSensors.containsKey(key)) {
-                while (!storeLevelSensors.get(key).isEmpty()) {
-                    metrics.removeSensor(storeLevelSensors.get(key).pop());
-                }
-                storeLevelSensors.remove(key);
+            final Deque<String> sensors = storeLevelSensors.remove(key);
+            while (sensors != null && !sensors.isEmpty()) {
+                metrics.removeSensor(sensors.pop());
             }
         }
     }
@@ -410,12 +406,19 @@ public void removeSensor(final Sensor sensor) {
         Objects.requireNonNull(sensor, "Sensor is null");
         metrics.removeSensor(sensor.name());
 
-        final Sensor parent = parentSensors.get(sensor);
+        final Sensor parent = parentSensors.remove(sensor);
         if (parent != null) {
             metrics.removeSensor(parent.name());
         }
     }
 
+    /**
+     * Visible for testing
+     */
+    Map<Sensor, Sensor> parentSensors() {
+        return Collections.unmodifiableMap(parentSensors);
+    }
+
     private static String groupNameFromScope(final String scopeName) {
         return "stream-" + scopeName + "-metrics";
     }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImplTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
similarity index 62%
rename from 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImplTest.java
rename to 
streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
index 7ce27b4b6d6..38629c38d41 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImplTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.streams.processor.internals;
+package org.apache.kafka.streams.processor.internals.metrics;
 
 
 import org.apache.kafka.common.MetricName;
@@ -23,12 +23,21 @@
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.utils.MockTime;
-import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.junit.Test;
 
+import java.util.Collections;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.PROCESSOR_NODE_METRICS_GROUP;
+import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addAvgMaxLatency;
+import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCount;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThan;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
 
 public class StreamsMetricsImplTest {
 
@@ -62,6 +71,60 @@ public void testRemoveSensor() {
 
         final Sensor sensor3 = streamsMetrics.addThroughputSensor(scope, 
entity, operation, Sensor.RecordingLevel.DEBUG);
         streamsMetrics.removeSensor(sensor3);
+
+        assertEquals(Collections.emptyMap(), streamsMetrics.parentSensors());
+    }
+
+    @Test
+    public void testMutiLevelSensorRemoval() {
+        final Metrics registry = new Metrics();
+        final StreamsMetricsImpl metrics = new StreamsMetricsImpl(registry, 
"");
+        for (final MetricName defaultMetric : registry.metrics().keySet()) {
+            registry.removeMetric(defaultMetric);
+        }
+
+        final String taskName = "taskName";
+        final String operation = "operation";
+        final Map<String, String> taskTags = mkMap(mkEntry("tkey", "value"));
+
+        final String processorNodeName = "processorNodeName";
+        final Map<String, String> nodeTags = mkMap(mkEntry("nkey", "value"));
+
+        final Sensor parent1 = metrics.taskLevelSensor(taskName, operation, 
Sensor.RecordingLevel.DEBUG);
+        addAvgMaxLatency(parent1, PROCESSOR_NODE_METRICS_GROUP, taskTags, 
operation);
+        addInvocationRateAndCount(parent1, PROCESSOR_NODE_METRICS_GROUP, 
taskTags, operation);
+
+        final int numberOfTaskMetrics = registry.metrics().size();
+
+        final Sensor sensor1 = metrics.nodeLevelSensor(taskName, 
processorNodeName, operation, Sensor.RecordingLevel.DEBUG, parent1);
+        addAvgMaxLatency(sensor1, PROCESSOR_NODE_METRICS_GROUP, nodeTags, 
operation);
+        addInvocationRateAndCount(sensor1, PROCESSOR_NODE_METRICS_GROUP, 
nodeTags, operation);
+
+        assertThat(registry.metrics().size(), 
greaterThan(numberOfTaskMetrics));
+
+        metrics.removeAllNodeLevelSensors(taskName, processorNodeName);
+
+        assertThat(registry.metrics().size(), equalTo(numberOfTaskMetrics));
+
+        final Sensor parent2 = metrics.taskLevelSensor(taskName, operation, 
Sensor.RecordingLevel.DEBUG);
+        addAvgMaxLatency(parent2, PROCESSOR_NODE_METRICS_GROUP, taskTags, 
operation);
+        addInvocationRateAndCount(parent2, PROCESSOR_NODE_METRICS_GROUP, 
taskTags, operation);
+
+        assertThat(registry.metrics().size(), equalTo(numberOfTaskMetrics));
+
+        final Sensor sensor2 = metrics.nodeLevelSensor(taskName, 
processorNodeName, operation, Sensor.RecordingLevel.DEBUG, parent2);
+        addAvgMaxLatency(sensor2, PROCESSOR_NODE_METRICS_GROUP, nodeTags, 
operation);
+        addInvocationRateAndCount(sensor2, PROCESSOR_NODE_METRICS_GROUP, 
nodeTags, operation);
+
+        assertThat(registry.metrics().size(), 
greaterThan(numberOfTaskMetrics));
+
+        metrics.removeAllNodeLevelSensors(taskName, processorNodeName);
+
+        assertThat(registry.metrics().size(), equalTo(numberOfTaskMetrics));
+
+        metrics.removeAllTaskLevelSensors(taskName);
+
+        assertThat(registry.metrics().size(), equalTo(0));
     }
 
     @Test


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Stream Metrics - Memory Analysis
> --------------------------------
>
>                 Key: KAFKA-7660
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7660
>             Project: Kafka
>          Issue Type: Bug
>          Components: metrics, streams
>    Affects Versions: 2.0.0
>            Reporter: Patrik Kleindl
>            Assignee: John Roesler
>            Priority: Minor
>             Fix For: 2.2.0
>
>         Attachments: Mem_Collections.jpeg, Mem_DuplicateStrings.jpeg, 
> Mem_DuplicateStrings2.jpeg, Mem_Hotspots.jpeg, Mem_KeepAliveSet.jpeg, 
> Mem_References.jpeg, heapdump-1543441898901.hprof
>
>
> During the analysis of JVM memory two possible issues were shown which I 
> would like to bring to your attention:
> 1) Duplicate strings
> Top findings: 
> string_content="stream-processor-node-metrics" count="534,277"
> string_content="processor-node-id" count="148,437"
> string_content="stream-rocksdb-state-metrics" count="41,832"
> string_content="punctuate-latency-avg" count="29,681" 
>  
> "stream-processor-node-metrics"  seems to be used in Sensors.java as a 
> literal and not interned.
>  
> 2) The HashMap parentSensors from 
> org.apache.kafka.streams.processor.internals.StreamThread$StreamsMetricsThreadImpl
>  was reported multiple times as suspicious for potentially keeping alive a 
> lot of objects. In our case the reported size was 40-50MB each.
> I haven't looked too deep in the code but noticed that the class Sensor.java 
> which is used as a key in the HashMap does not implement equals or hashCode 
> method. Not sure this is a problem though.
>  
> The analysis was done with Dynatrace 7.0
> We are running Confluent 5.0/Kafka2.0-cp1 (Brokers as well as Clients)
>  
> Screenshots are attached.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to