[ https://issues.apache.org/jira/browse/KAFKA-7660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16710811#comment-16710811 ]
ASF GitHub Bot commented on KAFKA-7660: --------------------------------------- mjsax closed pull request #5982: KAFKA-7660: fix streams and Metrics memory leaks URL: https://github.com/apache/kafka/pull/5982 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java index a9d80f1dde3..ac1ffaf9418 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java @@ -446,6 +446,12 @@ public void removeSensor(String name) { removeMetric(metric.metricName()); log.debug("Removed sensor with name {}", name); childSensors = childrenSensors.remove(sensor); + for (final Sensor parent : sensor.parents()) { + final List<Sensor> peers = childrenSensors.get(parent); + if (peers != null) { + peers.remove(sensor); + } + } } } } diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java index 321fab661cd..c845ff80895 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java @@ -22,13 +22,15 @@ import org.apache.kafka.common.utils.Utils; import java.util.ArrayList; -import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Locale; import java.util.Set; import java.util.concurrent.TimeUnit; +import static java.util.Arrays.asList; +import static java.util.Collections.unmodifiableList; + /** * A sensor applies a continuous sequence of numerical values to a set of associated metrics. For example a sensor on * message size would record a sequence of message sizes using the {@link #record(double)} api and would maintain a set @@ -132,6 +134,10 @@ public String name() { return this.name; } + List<Sensor> parents() { + return unmodifiableList(asList(parents)); + } + /** * Record an occurrence, this is just short-hand for {@link #record(double) record(1.0)} */ @@ -267,6 +273,6 @@ public boolean hasExpired() { } synchronized List<KafkaMetric> metrics() { - return Collections.unmodifiableList(this.metrics); + return unmodifiableList(this.metrics); } } diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java index 216493f84a6..3db46e2fb72 100644 --- a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java @@ -16,6 +16,8 @@ */ package org.apache.kafka.common.metrics; +import static java.util.Collections.emptyList; +import static java.util.Collections.singletonList; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; @@ -177,6 +179,20 @@ public void testBadSensorHierarchy() { metrics.sensor("gc", c1, c2); // should fail } + @Test + public void testRemoveChildSensor() { + final Metrics metrics = new Metrics(); + + final Sensor parent = metrics.sensor("parent"); + final Sensor child = metrics.sensor("child", parent); + + assertEquals(singletonList(child), metrics.childrenSensors().get(parent)); + + metrics.removeSensor("child"); + + assertEquals(emptyList(), metrics.childrenSensors().get(parent)); + } + @Test public void testRemoveSensor() { int size = metrics.metrics().size(); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java index cf25dd10fda..7b50feeaf8d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java @@ -222,10 +222,9 @@ public void removeSensor(Sensor sensor) { Objects.requireNonNull(sensor, "Sensor is null"); metrics.removeSensor(sensor.name()); - final Sensor parent = parentSensors.get(sensor); + final Sensor parent = parentSensors.remove(sensor); if (parent != null) { metrics.removeSensor(parent.name()); - parentSensors.remove(sensor); } } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Stream Metrics - Memory Analysis > -------------------------------- > > Key: KAFKA-7660 > URL: https://issues.apache.org/jira/browse/KAFKA-7660 > Project: Kafka > Issue Type: Bug > Components: metrics, streams > Affects Versions: 2.0.0 > Reporter: Patrik Kleindl > Assignee: John Roesler > Priority: Minor > Fix For: 2.2.0 > > Attachments: Mem_Collections.jpeg, Mem_DuplicateStrings.jpeg, > Mem_DuplicateStrings2.jpeg, Mem_Hotspots.jpeg, Mem_KeepAliveSet.jpeg, > Mem_References.jpeg, heapdump-1543441898901.hprof > > > During the analysis of JVM memory two possible issues were shown which I > would like to bring to your attention: > 1) Duplicate strings > Top findings: > string_content="stream-processor-node-metrics" count="534,277" > string_content="processor-node-id" count="148,437" > string_content="stream-rocksdb-state-metrics" count="41,832" > string_content="punctuate-latency-avg" count="29,681" > > "stream-processor-node-metrics" seems to be used in Sensors.java as a > literal and not interned. > > 2) The HashMap parentSensors from > org.apache.kafka.streams.processor.internals.StreamThread$StreamsMetricsThreadImpl > was reported multiple times as suspicious for potentially keeping alive a > lot of objects. In our case the reported size was 40-50MB each. > I haven't looked too deep in the code but noticed that the class Sensor.java > which is used as a key in the HashMap does not implement equals or hashCode > method. Not sure this is a problem though. > > The analysis was done with Dynatrace 7.0 > We are running Confluent 5.0/Kafka2.0-cp1 (Brokers as well as Clients) > > Screenshots are attached. -- This message was sent by Atlassian JIRA (v7.6.3#76005)