[ https://issues.apache.org/jira/browse/KAFKA-7660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16704243#comment-16704243 ]
ASF GitHub Bot commented on KAFKA-7660: --------------------------------------- guozhangwang closed pull request #5974: KAFKA-7660: Fix child sensor memory leak URL: https://github.com/apache/kafka/pull/5974 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java index a6da9f90397..9e2b6f18f0c 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java @@ -41,6 +41,8 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import static java.util.Collections.emptyList; + /** * A registry of sensors and metrics. * <p> @@ -446,6 +448,9 @@ public void removeSensor(String name) { removeMetric(metric.metricName()); log.debug("Removed sensor with name {}", name); childSensors = childrenSensors.remove(sensor); + for (final Sensor parent : sensor.parents()) { + childrenSensors.getOrDefault(parent, emptyList()).remove(sensor); + } } } } diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java index ccbe8aad9cd..1af9419bc75 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java @@ -22,7 +22,6 @@ import org.apache.kafka.common.utils.Utils; import java.util.ArrayList; -import java.util.Collections; import java.util.HashSet; import java.util.LinkedHashMap; import java.util.LinkedList; @@ -32,6 +31,9 @@ import java.util.Set; import java.util.concurrent.TimeUnit; +import static java.util.Arrays.asList; +import static java.util.Collections.unmodifiableList; + /** * A sensor applies a continuous sequence of numerical values to a set of associated metrics. For example a sensor on * message size would record a sequence of message sizes using the {@link #record(double)} api and would maintain a set @@ -133,6 +135,10 @@ public String name() { return this.name; } + List<Sensor> parents() { + return unmodifiableList(asList(parents)); + } + /** * Record an occurrence, this is just short-hand for {@link #record(double) record(1.0)} */ @@ -291,7 +297,7 @@ public boolean hasExpired() { } synchronized List<KafkaMetric> metrics() { - return Collections.unmodifiableList(new LinkedList<>(this.metrics.values())); + return unmodifiableList(new LinkedList<>(this.metrics.values())); } /** diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java index 3184aeb8260..98b468afc3d 100644 --- a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java @@ -16,6 +16,8 @@ */ package org.apache.kafka.common.metrics; +import static java.util.Collections.emptyList; +import static java.util.Collections.singletonList; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; @@ -202,6 +204,20 @@ public void testBadSensorHierarchy() { metrics.sensor("gc", c1, c2); // should fail } + @Test + public void testRemoveChildSensor() { + final Metrics metrics = new Metrics(); + + final Sensor parent = metrics.sensor("parent"); + final Sensor child = metrics.sensor("child", parent); + + assertEquals(singletonList(child), metrics.childrenSensors().get(parent)); + + metrics.removeSensor("child"); + + assertEquals(emptyList(), metrics.childrenSensors().get(parent)); + } + @Test public void testRemoveSensor() { int size = metrics.metrics().size(); ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Stream Metrics - Memory Analysis > -------------------------------- > > Key: KAFKA-7660 > URL: https://issues.apache.org/jira/browse/KAFKA-7660 > Project: Kafka > Issue Type: Bug > Components: metrics, streams > Affects Versions: 2.0.0 > Reporter: Patrik Kleindl > Priority: Minor > Attachments: Mem_Collections.jpeg, Mem_DuplicateStrings.jpeg, > Mem_DuplicateStrings2.jpeg, Mem_Hotspots.jpeg, Mem_KeepAliveSet.jpeg, > Mem_References.jpeg, heapdump-1543441898901.hprof > > > During the analysis of JVM memory two possible issues were shown which I > would like to bring to your attention: > 1) Duplicate strings > Top findings: > string_content="stream-processor-node-metrics" count="534,277" > string_content="processor-node-id" count="148,437" > string_content="stream-rocksdb-state-metrics" count="41,832" > string_content="punctuate-latency-avg" count="29,681" > > "stream-processor-node-metrics" seems to be used in Sensors.java as a > literal and not interned. > > 2) The HashMap parentSensors from > org.apache.kafka.streams.processor.internals.StreamThread$StreamsMetricsThreadImpl > was reported multiple times as suspicious for potentially keeping alive a > lot of objects. In our case the reported size was 40-50MB each. > I haven't looked too deep in the code but noticed that the class Sensor.java > which is used as a key in the HashMap does not implement equals or hashCode > method. Not sure this is a problem though. > > The analysis was done with Dynatrace 7.0 > We are running Confluent 5.0/Kafka2.0-cp1 (Brokers as well as Clients) > > Screenshots are attached. -- This message was sent by Atlassian JIRA (v7.6.3#76005)