This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new b7d95da KAFKA-7660: Fix child sensor memory leak (#5974) b7d95da is described below commit b7d95da88df3a521870fa4f6d6e53cee7e679d40 Author: John Roesler <vvcep...@users.noreply.github.com> AuthorDate: Thu Nov 29 22:10:24 2018 -0600 KAFKA-7660: Fix child sensor memory leak (#5974) A heap dump provided by Patrik Kleindl in https://issues.apache.org/jira/browse/KAFKA-7660 identifies the childrenSensors map in Metrics as keeping references to sensors alive after they have been removed. This PR fixes it and adds a test to be sure. Reviewers: Jason Gustafson <ja...@confluent.io>, Guozhang Wang <wangg...@gmail.com> --- .../java/org/apache/kafka/common/metrics/Metrics.java | 5 +++++ .../java/org/apache/kafka/common/metrics/Sensor.java | 10 ++++++++-- .../org/apache/kafka/common/metrics/MetricsTest.java | 16 ++++++++++++++++ 3 files changed, 29 insertions(+), 2 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java index a6da9f9..9e2b6f1 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java @@ -41,6 +41,8 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import static java.util.Collections.emptyList; + /** * A registry of sensors and metrics. * <p> @@ -446,6 +448,9 @@ public class Metrics implements Closeable { removeMetric(metric.metricName()); log.debug("Removed sensor with name {}", name); childSensors = childrenSensors.remove(sensor); + for (final Sensor parent : sensor.parents()) { + childrenSensors.getOrDefault(parent, emptyList()).remove(sensor); + } } } } diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java index ccbe8aa..1af9419 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java @@ -22,7 +22,6 @@ import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import java.util.ArrayList; -import java.util.Collections; import java.util.HashSet; import java.util.LinkedHashMap; import java.util.LinkedList; @@ -32,6 +31,9 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; +import static java.util.Arrays.asList; +import static java.util.Collections.unmodifiableList; + /** * A sensor applies a continuous sequence of numerical values to a set of associated metrics. For example a sensor on * message size would record a sequence of message sizes using the {@link #record(double)} api and would maintain a set @@ -133,6 +135,10 @@ public final class Sensor { return this.name; } + List<Sensor> parents() { + return unmodifiableList(asList(parents)); + } + /** * Record an occurrence, this is just short-hand for {@link #record(double) record(1.0)} */ @@ -291,7 +297,7 @@ public final class Sensor { } synchronized List<KafkaMetric> metrics() { - return Collections.unmodifiableList(new LinkedList<>(this.metrics.values())); + return unmodifiableList(new LinkedList<>(this.metrics.values())); } /** diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java index 3184aeb..98b468a 100644 --- a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java @@ -16,6 +16,8 @@ */ package org.apache.kafka.common.metrics; +import static java.util.Collections.emptyList; +import static java.util.Collections.singletonList; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; @@ -203,6 +205,20 @@ public class MetricsTest { } @Test + public void testRemoveChildSensor() { + final Metrics metrics = new Metrics(); + + final Sensor parent = metrics.sensor("parent"); + final Sensor child = metrics.sensor("child", parent); + + assertEquals(singletonList(child), metrics.childrenSensors().get(parent)); + + metrics.removeSensor("child"); + + assertEquals(emptyList(), metrics.childrenSensors().get(parent)); + } + + @Test public void testRemoveSensor() { int size = metrics.metrics().size(); Sensor parent1 = metrics.sensor("test.parent1");