This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new cc43e77 MINOR: make Sensor#add idempotent (#4853) cc43e77 is described below commit cc43e77bbbfad71883011186de55603c936cbcd1 Author: John Roesler <vvcep...@users.noreply.github.com> AuthorDate: Wed Apr 11 22:50:10 2018 -0500 MINOR: make Sensor#add idempotent (#4853) This change makes adding a metric to a sensor idempotent. That is, if the metric is already added to the sensor, the method returns with success. The current behavior is that any attempt to register a second metric with the same name is an error. Testing strategy: There is a new unit test covering this behavior Reviewers: Guozhang Wang <guozh...@confluent.io>, Bill Bejeck <b...@confluent.io> --- .../org/apache/kafka/common/metrics/Sensor.java | 53 +++++++++++++--------- .../apache/kafka/common/metrics/SensorTest.java | 48 ++++++++++++++++---- .../kafka/connect/runtime/ConnectMetricsTest.java | 25 +++++----- 3 files changed, 83 insertions(+), 43 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java index 06c8c7f..22f273d 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java @@ -24,8 +24,11 @@ import org.apache.kafka.common.utils.Utils; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.LinkedList; import java.util.List; import java.util.Locale; +import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -40,7 +43,7 @@ public final class Sensor { private final String name; private final Sensor[] parents; private final List<Stat> stats; - private final List<KafkaMetric> metrics; + private final Map<MetricName, KafkaMetric> metrics; private final MetricConfig config; private final Time time; private volatile long lastRecordTime; @@ -103,7 +106,7 @@ public final class Sensor { this.registry = registry; this.name = Utils.notNull(name); this.parents = parents == null ? new Sensor[0] : parents; - this.metrics = new ArrayList<>(); + this.metrics = new LinkedHashMap<>(); this.stats = new ArrayList<>(); this.config = config; this.time = time; @@ -190,7 +193,7 @@ public final class Sensor { } public void checkQuotas(long timeMs) { - for (KafkaMetric metric : this.metrics) { + for (KafkaMetric metric : this.metrics.values()) { MetricConfig config = metric.config(); if (config != null) { Quota quota = config.quota(); @@ -228,9 +231,11 @@ public final class Sensor { this.stats.add(Utils.notNull(stat)); Object lock = new Object(); for (NamedMeasurable m : stat.stats()) { - KafkaMetric metric = new KafkaMetric(lock, m.name(), m.stat(), config == null ? this.config : config, time); - this.registry.registerMetric(metric); - this.metrics.add(metric); + final KafkaMetric metric = new KafkaMetric(lock, m.name(), m.stat(), config == null ? this.config : config, time); + if (!metrics.containsKey(metric.metricName())) { + registry.registerMetric(metric); + metrics.put(metric.metricName(), metric); + } } return true; } @@ -247,24 +252,30 @@ public final class Sensor { /** * Register a metric with this sensor + * * @param metricName The name of the metric - * @param stat The statistic to keep - * @param config A special configuration for this metric. If null use the sensor default configuration. + * @param stat The statistic to keep + * @param config A special configuration for this metric. If null use the sensor default configuration. * @return true if metric is added to sensor, false if sensor is expired */ - public synchronized boolean add(MetricName metricName, MeasurableStat stat, MetricConfig config) { - if (hasExpired()) + public synchronized boolean add(final MetricName metricName, final MeasurableStat stat, final MetricConfig config) { + if (hasExpired()) { return false; - - KafkaMetric metric = new KafkaMetric(new Object(), - Utils.notNull(metricName), - Utils.notNull(stat), - config == null ? this.config : config, - time); - this.registry.registerMetric(metric); - this.metrics.add(metric); - this.stats.add(stat); - return true; + } else if (metrics.containsKey(metricName)) { + return true; + } else { + final KafkaMetric metric = new KafkaMetric( + new Object(), + Utils.notNull(metricName), + Utils.notNull(stat), + config == null ? this.config : config, + time + ); + registry.registerMetric(metric); + metrics.put(metric.metricName(), metric); + stats.add(stat); + return true; + } } /** @@ -276,6 +287,6 @@ public final class Sensor { } synchronized List<KafkaMetric> metrics() { - return Collections.unmodifiableList(this.metrics); + return Collections.unmodifiableList(new LinkedList<>(this.metrics.values())); } } diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java index 3f7551e..a1c0814 100644 --- a/clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java @@ -16,23 +16,25 @@ */ package org.apache.kafka.common.metrics; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -import java.util.Arrays; -import java.util.Collections; -import java.util.Map; -import java.util.concurrent.TimeUnit; - import org.apache.kafka.common.MetricName; import org.apache.kafka.common.metrics.stats.Avg; import org.apache.kafka.common.metrics.stats.Meter; +import org.apache.kafka.common.metrics.stats.Sum; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.SystemTime; import org.apache.kafka.common.utils.Time; import org.junit.Test; +import java.util.Arrays; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + public class SensorTest { @Test public void testRecordLevelEnum() { @@ -94,4 +96,32 @@ public class SensorTest { metrics.close(); } + + @Test + public void testIdempotentAdd() { + final Metrics metrics = new Metrics(); + final Sensor sensor = metrics.sensor("sensor"); + + assertTrue(sensor.add(metrics.metricName("test-metric", "test-group"), new Avg())); + + // adding the same metric to the same sensor is a no-op + assertTrue(sensor.add(metrics.metricName("test-metric", "test-group"), new Avg())); + + + // but adding the same metric to a DIFFERENT sensor is an error + final Sensor anotherSensor = metrics.sensor("another-sensor"); + try { + anotherSensor.add(metrics.metricName("test-metric", "test-group"), new Avg()); + fail("should have thrown"); + } catch (final IllegalArgumentException ignored) { + // pass + } + + // note that adding a different metric with the same name is also a no-op + assertTrue(sensor.add(metrics.metricName("test-metric", "test-group"), new Sum())); + + // so after all this, we still just have the original metric registered + assertEquals(1, sensor.metrics().size()); + assertEquals(org.apache.kafka.common.metrics.stats.Avg.class, sensor.metrics().get(0).measurable().getClass()); + } } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectMetricsTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectMetricsTest.java index 60bd863..25395f9 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectMetricsTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectMetricsTest.java @@ -20,11 +20,10 @@ import org.apache.kafka.common.MetricName; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.stats.Avg; import org.apache.kafka.common.metrics.stats.Max; +import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup; import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroupId; -import org.apache.kafka.common.utils.MockTime; import org.junit.After; -import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -145,21 +144,21 @@ public class ConnectMetricsTest { @Test public void testRecreateWithClose() { - int numMetrics = addToGroup(metrics, false); - int numMetricsInRecreatedGroup = addToGroup(metrics, true); - Assert.assertEquals(numMetrics, numMetricsInRecreatedGroup); + final Sensor originalSensor = addToGroup(metrics, false); + final Sensor recreatedSensor = addToGroup(metrics, true); + // because we closed the metricGroup, we get a brand-new sensor + assertNotSame(originalSensor, recreatedSensor); } - @Test(expected = IllegalArgumentException.class) + @Test public void testRecreateWithoutClose() { - int numMetrics = addToGroup(metrics, false); - int numMetricsInRecreatedGroup = addToGroup(metrics, false); - // we should never get here - throw new RuntimeException("Created " + numMetricsInRecreatedGroup - + " metrics in recreated group. Original=" + numMetrics); + final Sensor originalSensor = addToGroup(metrics, false); + final Sensor recreatedSensor = addToGroup(metrics, false); + // since we didn't close the group, the second addToGroup is idempotent + assertSame(originalSensor, recreatedSensor); } - private int addToGroup(ConnectMetrics connectMetrics, boolean shouldClose) { + private Sensor addToGroup(ConnectMetrics connectMetrics, boolean shouldClose) { ConnectMetricsRegistry registry = connectMetrics.registry(); ConnectMetrics.MetricGroup metricGroup = connectMetrics.group(registry.taskGroupName(), registry.connectorTagName(), "conn_name"); @@ -172,7 +171,7 @@ public class ConnectMetricsTest { sensor.add(metricName("x1"), new Max()); sensor.add(metricName("y2"), new Avg()); - return metricGroup.metrics().metrics().size(); + return sensor; } static MetricName metricName(String name) { -- To stop receiving notification emails like this one, please contact guozh...@apache.org.