[ https://issues.apache.org/jira/browse/KAFKA-7660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16710810#comment-16710810 ]
ASF GitHub Bot commented on KAFKA-7660: --------------------------------------- mjsax closed pull request #5981: KAFKA-7660: fix streams and Metrics memory leaks URL: https://github.com/apache/kafka/pull/5981 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java index e83085e7132..803fd7c9cca 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java @@ -446,6 +446,12 @@ public void removeSensor(String name) { removeMetric(metric.metricName()); log.debug("Removed sensor with name {}", name); childSensors = childrenSensors.remove(sensor); + for (final Sensor parent : sensor.parents()) { + final List<Sensor> peers = childrenSensors.get(parent); + if (peers != null) { + peers.remove(sensor); + } + } } } } diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java index 7ee23d31f47..16f33c2d095 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java @@ -22,13 +22,15 @@ import org.apache.kafka.common.utils.Utils; import java.util.ArrayList; -import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Locale; import java.util.Set; import java.util.concurrent.TimeUnit; +import static java.util.Arrays.asList; +import static java.util.Collections.unmodifiableList; + /** * A sensor applies a continuous sequence of numerical values to a set of associated metrics. For example a sensor on * message size would record a sequence of message sizes using the {@link #record(double)} api and would maintain a set @@ -134,6 +136,10 @@ public String name() { return this.name; } + List<Sensor> parents() { + return unmodifiableList(asList(parents)); + } + /** * Record an occurrence, this is just short-hand for {@link #record(double) record(1.0)} */ @@ -271,7 +277,7 @@ public boolean hasExpired() { } synchronized List<KafkaMetric> metrics() { - return Collections.unmodifiableList(this.metrics); + return unmodifiableList(this.metrics); } /** diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java index 7a973daf043..7dd780d4497 100644 --- a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java @@ -16,6 +16,8 @@ */ package org.apache.kafka.common.metrics; +import static java.util.Collections.emptyList; +import static java.util.Collections.singletonList; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; @@ -197,6 +199,20 @@ public void testBadSensorHierarchy() { metrics.sensor("gc", c1, c2); // should fail } + @Test + public void testRemoveChildSensor() { + final Metrics metrics = new Metrics(); + + final Sensor parent = metrics.sensor("parent"); + final Sensor child = metrics.sensor("child", parent); + + assertEquals(singletonList(child), metrics.childrenSensors().get(parent)); + + metrics.removeSensor("child"); + + assertEquals(emptyList(), metrics.childrenSensors().get(parent)); + } + @Test public void testRemoveSensor() { int size = metrics.metrics().size(); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java index 464beca5c5c..59a9d4f249f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java @@ -222,10 +222,9 @@ public void removeSensor(Sensor sensor) { Objects.requireNonNull(sensor, "Sensor is null"); metrics.removeSensor(sensor.name()); - final Sensor parent = parentSensors.get(sensor); + final Sensor parent = parentSensors.remove(sensor); if (parent != null) { metrics.removeSensor(parent.name()); - parentSensors.remove(sensor); } } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Stream Metrics - Memory Analysis > -------------------------------- > > Key: KAFKA-7660 > URL: https://issues.apache.org/jira/browse/KAFKA-7660 > Project: Kafka > Issue Type: Bug > Components: metrics, streams > Affects Versions: 2.0.0 > Reporter: Patrik Kleindl > Assignee: John Roesler > Priority: Minor > Fix For: 2.2.0 > > Attachments: Mem_Collections.jpeg, Mem_DuplicateStrings.jpeg, > Mem_DuplicateStrings2.jpeg, Mem_Hotspots.jpeg, Mem_KeepAliveSet.jpeg, > Mem_References.jpeg, heapdump-1543441898901.hprof > > > During the analysis of JVM memory two possible issues were shown which I > would like to bring to your attention: > 1) Duplicate strings > Top findings: > string_content="stream-processor-node-metrics" count="534,277" > string_content="processor-node-id" count="148,437" > string_content="stream-rocksdb-state-metrics" count="41,832" > string_content="punctuate-latency-avg" count="29,681" > > "stream-processor-node-metrics" seems to be used in Sensors.java as a > literal and not interned. > > 2) The HashMap parentSensors from > org.apache.kafka.streams.processor.internals.StreamThread$StreamsMetricsThreadImpl > was reported multiple times as suspicious for potentially keeping alive a > lot of objects. In our case the reported size was 40-50MB each. > I haven't looked too deep in the code but noticed that the class Sensor.java > which is used as a key in the HashMap does not implement equals or hashCode > method. Not sure this is a problem though. > > The analysis was done with Dynatrace 7.0 > We are running Confluent 5.0/Kafka2.0-cp1 (Brokers as well as Clients) > > Screenshots are attached. -- This message was sent by Atlassian JIRA (v7.6.3#76005)