[ https://issues.apache.org/jira/browse/KAFKA-7660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16704001#comment-16704001 ]
ASF GitHub Bot commented on KAFKA-7660: --------------------------------------- guozhangwang closed pull request #5953: KAFKA-7660: fix parentSensors memory leak URL: https://github.com/apache/kafka/pull/5953 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java index 8483791e2e7..ec4f8e5e5db 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java @@ -234,9 +234,11 @@ private static Sensor createTaskAndNodeLatencyAndThroughputSensors(final String final Sensor parent = metrics.taskLevelSensor(taskName, operation, Sensor.RecordingLevel.DEBUG); addAvgMaxLatency(parent, PROCESSOR_NODE_METRICS_GROUP, taskTags, operation); addInvocationRateAndCount(parent, PROCESSOR_NODE_METRICS_GROUP, taskTags, operation); + final Sensor sensor = metrics.nodeLevelSensor(taskName, processorNodeName, operation, Sensor.RecordingLevel.DEBUG, parent); addAvgMaxLatency(sensor, PROCESSOR_NODE_METRICS_GROUP, nodeTags, operation); addInvocationRateAndCount(sensor, PROCESSOR_NODE_METRICS_GROUP, nodeTags, operation); + return sensor; } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java index 8ec2711e764..dd6cc4a2185 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java @@ -115,11 +115,9 @@ public final Sensor taskLevelSensor(final String taskName, public final void removeAllTaskLevelSensors(final String taskName) { final String key = taskSensorPrefix(taskName); synchronized (taskLevelSensors) { - if (taskLevelSensors.containsKey(key)) { - while (!taskLevelSensors.get(key).isEmpty()) { - metrics.removeSensor(taskLevelSensors.get(key).pop()); - } - taskLevelSensors.remove(key); + final Deque<String> sensors = taskLevelSensors.remove(key); + while (sensors != null && !sensors.isEmpty()) { + metrics.removeSensor(sensors.pop()); } } } @@ -152,10 +150,9 @@ public Sensor nodeLevelSensor(final String taskName, public final void removeAllNodeLevelSensors(final String taskName, final String processorNodeName) { final String key = nodeSensorPrefix(taskName, processorNodeName); synchronized (nodeLevelSensors) { - if (nodeLevelSensors.containsKey(key)) { - while (!nodeLevelSensors.get(key).isEmpty()) { - metrics.removeSensor(nodeLevelSensors.get(key).pop()); - } + final Deque<String> sensors = nodeLevelSensors.remove(key); + while (sensors != null && !sensors.isEmpty()) { + metrics.removeSensor(sensors.pop()); } } } @@ -188,11 +185,9 @@ public final Sensor cacheLevelSensor(final String taskName, public final void removeAllCacheLevelSensors(final String taskName, final String cacheName) { final String key = cacheSensorPrefix(taskName, cacheName); synchronized (cacheLevelSensors) { - if (cacheLevelSensors.containsKey(key)) { - while (!cacheLevelSensors.get(key).isEmpty()) { - metrics.removeSensor(cacheLevelSensors.get(key).pop()); - } - cacheLevelSensors.remove(key); + final Deque<String> strings = cacheLevelSensors.remove(key); + while (strings != null && !strings.isEmpty()) { + metrics.removeSensor(strings.pop()); } } } @@ -225,11 +220,9 @@ public final Sensor storeLevelSensor(final String taskName, public final void removeAllStoreLevelSensors(final String taskName, final String storeName) { final String key = storeSensorPrefix(taskName, storeName); synchronized (storeLevelSensors) { - if (storeLevelSensors.containsKey(key)) { - while (!storeLevelSensors.get(key).isEmpty()) { - metrics.removeSensor(storeLevelSensors.get(key).pop()); - } - storeLevelSensors.remove(key); + final Deque<String> sensors = storeLevelSensors.remove(key); + while (sensors != null && !sensors.isEmpty()) { + metrics.removeSensor(sensors.pop()); } } } @@ -413,12 +406,19 @@ public void removeSensor(final Sensor sensor) { Objects.requireNonNull(sensor, "Sensor is null"); metrics.removeSensor(sensor.name()); - final Sensor parent = parentSensors.get(sensor); + final Sensor parent = parentSensors.remove(sensor); if (parent != null) { metrics.removeSensor(parent.name()); } } + /** + * Visible for testing + */ + Map<Sensor, Sensor> parentSensors() { + return Collections.unmodifiableMap(parentSensors); + } + private static String groupNameFromScope(final String scopeName) { return "stream-" + scopeName + "-metrics"; } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java similarity index 59% rename from streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImplTest.java rename to streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java index 7ce27b4b6d6..cadfdb0cef1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.kafka.streams.processor.internals; +package org.apache.kafka.streams.processor.internals.metrics; import org.apache.kafka.common.MetricName; @@ -23,12 +23,21 @@ import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.utils.MockTime; -import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.junit.Test; +import java.util.Collections; +import java.util.Map; import java.util.concurrent.TimeUnit; +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.PROCESSOR_NODE_METRICS_GROUP; +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addAvgMaxLatency; +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCount; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; public class StreamsMetricsImplTest { @@ -62,6 +71,60 @@ public void testRemoveSensor() { final Sensor sensor3 = streamsMetrics.addThroughputSensor(scope, entity, operation, Sensor.RecordingLevel.DEBUG); streamsMetrics.removeSensor(sensor3); + + assertEquals(Collections.emptyMap(), streamsMetrics.parentSensors()); + } + + @Test + public void testMutiLevelSensorRemoval() { + final Metrics registry = new Metrics(); + final StreamsMetricsImpl metrics = new StreamsMetricsImpl(registry, ""); + for (final MetricName defaultMetric : registry.metrics().keySet()) { + registry.removeMetric(defaultMetric); + } + + final String taskName = "taskName"; + final String operation = "operation"; + final Map<String, String> taskTags = mkMap(mkEntry("tkey", "value")); + + final String processorNodeName = "processorNodeName"; + final Map<String, String> nodeTags = mkMap(mkEntry("nkey", "value")); + + final Sensor parent1 = metrics.taskLevelSensor(taskName, operation, Sensor.RecordingLevel.DEBUG); + addAvgMaxLatency(parent1, PROCESSOR_NODE_METRICS_GROUP, taskTags, operation); + addInvocationRateAndCount(parent1, PROCESSOR_NODE_METRICS_GROUP, taskTags, operation); + + final int numberOfTaskMetrics = registry.metrics().size(); + + final Sensor sensor1 = metrics.nodeLevelSensor(taskName, processorNodeName, operation, Sensor.RecordingLevel.DEBUG, parent1); + addAvgMaxLatency(sensor1, PROCESSOR_NODE_METRICS_GROUP, nodeTags, operation); + addInvocationRateAndCount(sensor1, PROCESSOR_NODE_METRICS_GROUP, nodeTags, operation); + + assertThat(registry.metrics().size(), greaterThan(numberOfTaskMetrics)); + + metrics.removeAllNodeLevelSensors(taskName, processorNodeName); + + assertThat(registry.metrics().size(), equalTo(numberOfTaskMetrics)); + + final Sensor parent2 = metrics.taskLevelSensor(taskName, operation, Sensor.RecordingLevel.DEBUG); + addAvgMaxLatency(parent2, PROCESSOR_NODE_METRICS_GROUP, taskTags, operation); + addInvocationRateAndCount(parent2, PROCESSOR_NODE_METRICS_GROUP, taskTags, operation); + + assertThat(registry.metrics().size(), equalTo(numberOfTaskMetrics)); + + final Sensor sensor2 = metrics.nodeLevelSensor(taskName, processorNodeName, operation, Sensor.RecordingLevel.DEBUG, parent2); + addAvgMaxLatency(sensor2, PROCESSOR_NODE_METRICS_GROUP, nodeTags, operation); + addInvocationRateAndCount(sensor2, PROCESSOR_NODE_METRICS_GROUP, nodeTags, operation); + + assertThat(registry.metrics().size(), greaterThan(numberOfTaskMetrics)); + + metrics.removeAllNodeLevelSensors(taskName, processorNodeName); + + assertThat(registry.metrics().size(), equalTo(numberOfTaskMetrics)); + + metrics.removeAllTaskLevelSensors(taskName); + + assertThat(registry.metrics().size(), equalTo(0)); } @Test @@ -115,21 +178,21 @@ public void testTotalMetricDoesntDecrease() { final String operation = "op"; final Sensor sensor = streamsMetrics.addLatencyAndThroughputSensor( - scope, - entity, - operation, - Sensor.RecordingLevel.INFO + scope, + entity, + operation, + Sensor.RecordingLevel.INFO ); final double latency = 100.0; final MetricName totalMetricName = metrics.metricName( - "op-total", - "stream-scope-metrics", - "", - "client-id", - "", - "scope-id", - "entity" + "op-total", + "stream-scope-metrics", + "", + "client-id", + "", + "scope-id", + "entity" ); final KafkaMetric totalMetric = metrics.metric(totalMetricName); ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Stream Metrics - Memory Analysis > -------------------------------- > > Key: KAFKA-7660 > URL: https://issues.apache.org/jira/browse/KAFKA-7660 > Project: Kafka > Issue Type: Bug > Components: metrics, streams > Affects Versions: 2.0.0 > Reporter: Patrik Kleindl > Priority: Minor > Attachments: Mem_Collections.jpeg, Mem_DuplicateStrings.jpeg, > Mem_DuplicateStrings2.jpeg, Mem_Hotspots.jpeg, Mem_KeepAliveSet.jpeg, > Mem_References.jpeg, heapdump-1543441898901.hprof > > > During the analysis of JVM memory two possible issues were shown which I > would like to bring to your attention: > 1) Duplicate strings > Top findings: > string_content="stream-processor-node-metrics" count="534,277" > string_content="processor-node-id" count="148,437" > string_content="stream-rocksdb-state-metrics" count="41,832" > string_content="punctuate-latency-avg" count="29,681" > > "stream-processor-node-metrics" seems to be used in Sensors.java as a > literal and not interned. > > 2) The HashMap parentSensors from > org.apache.kafka.streams.processor.internals.StreamThread$StreamsMetricsThreadImpl > was reported multiple times as suspicious for potentially keeping alive a > lot of objects. In our case the reported size was 40-50MB each. > I haven't looked too deep in the code but noticed that the class Sensor.java > which is used as a key in the HashMap does not implement equals or hashCode > method. Not sure this is a problem though. > > The analysis was done with Dynatrace 7.0 > We are running Confluent 5.0/Kafka2.0-cp1 (Brokers as well as Clients) > > Screenshots are attached. -- This message was sent by Atlassian JIRA (v7.6.3#76005)