This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new cc43e77  MINOR: make Sensor#add idempotent (#4853)
cc43e77 is described below

commit cc43e77bbbfad71883011186de55603c936cbcd1
Author: John Roesler <vvcep...@users.noreply.github.com>
AuthorDate: Wed Apr 11 22:50:10 2018 -0500

    MINOR: make Sensor#add idempotent (#4853)
    
    This change makes adding a metric to a sensor idempotent.
    That is, if the metric is already added to the sensor, the method
    returns with success.
    
    The current behavior is that any attempt to register a second metric
    with the same name is an error.
    
    Testing strategy: There is a new unit test covering this behavior
    
    Reviewers: Guozhang Wang <guozh...@confluent.io>, Bill Bejeck 
<b...@confluent.io>
---
 .../org/apache/kafka/common/metrics/Sensor.java    | 53 +++++++++++++---------
 .../apache/kafka/common/metrics/SensorTest.java    | 48 ++++++++++++++++----
 .../kafka/connect/runtime/ConnectMetricsTest.java  | 25 +++++-----
 3 files changed, 83 insertions(+), 43 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java 
b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
index 06c8c7f..22f273d 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
@@ -24,8 +24,11 @@ import org.apache.kafka.common.utils.Utils;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Locale;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
@@ -40,7 +43,7 @@ public final class Sensor {
     private final String name;
     private final Sensor[] parents;
     private final List<Stat> stats;
-    private final List<KafkaMetric> metrics;
+    private final Map<MetricName, KafkaMetric> metrics;
     private final MetricConfig config;
     private final Time time;
     private volatile long lastRecordTime;
@@ -103,7 +106,7 @@ public final class Sensor {
         this.registry = registry;
         this.name = Utils.notNull(name);
         this.parents = parents == null ? new Sensor[0] : parents;
-        this.metrics = new ArrayList<>();
+        this.metrics = new LinkedHashMap<>();
         this.stats = new ArrayList<>();
         this.config = config;
         this.time = time;
@@ -190,7 +193,7 @@ public final class Sensor {
     }
 
     public void checkQuotas(long timeMs) {
-        for (KafkaMetric metric : this.metrics) {
+        for (KafkaMetric metric : this.metrics.values()) {
             MetricConfig config = metric.config();
             if (config != null) {
                 Quota quota = config.quota();
@@ -228,9 +231,11 @@ public final class Sensor {
         this.stats.add(Utils.notNull(stat));
         Object lock = new Object();
         for (NamedMeasurable m : stat.stats()) {
-            KafkaMetric metric = new KafkaMetric(lock, m.name(), m.stat(), 
config == null ? this.config : config, time);
-            this.registry.registerMetric(metric);
-            this.metrics.add(metric);
+            final KafkaMetric metric = new KafkaMetric(lock, m.name(), 
m.stat(), config == null ? this.config : config, time);
+            if (!metrics.containsKey(metric.metricName())) {
+                registry.registerMetric(metric);
+                metrics.put(metric.metricName(), metric);
+            }
         }
         return true;
     }
@@ -247,24 +252,30 @@ public final class Sensor {
 
     /**
      * Register a metric with this sensor
+     *
      * @param metricName The name of the metric
-     * @param stat The statistic to keep
-     * @param config A special configuration for this metric. If null use the 
sensor default configuration.
+     * @param stat       The statistic to keep
+     * @param config     A special configuration for this metric. If null use 
the sensor default configuration.
      * @return true if metric is added to sensor, false if sensor is expired
      */
-    public synchronized boolean add(MetricName metricName, MeasurableStat 
stat, MetricConfig config) {
-        if (hasExpired())
+    public synchronized boolean add(final MetricName metricName, final 
MeasurableStat stat, final MetricConfig config) {
+        if (hasExpired()) {
             return false;
-
-        KafkaMetric metric = new KafkaMetric(new Object(),
-                                             Utils.notNull(metricName),
-                                             Utils.notNull(stat),
-                                             config == null ? this.config : 
config,
-                                             time);
-        this.registry.registerMetric(metric);
-        this.metrics.add(metric);
-        this.stats.add(stat);
-        return true;
+        } else if (metrics.containsKey(metricName)) {
+            return true;
+        } else {
+            final KafkaMetric metric = new KafkaMetric(
+                new Object(),
+                Utils.notNull(metricName),
+                Utils.notNull(stat),
+                config == null ? this.config : config,
+                time
+            );
+            registry.registerMetric(metric);
+            metrics.put(metric.metricName(), metric);
+            stats.add(stat);
+            return true;
+        }
     }
 
     /**
@@ -276,6 +287,6 @@ public final class Sensor {
     }
 
     synchronized List<KafkaMetric> metrics() {
-        return Collections.unmodifiableList(this.metrics);
+        return Collections.unmodifiableList(new 
LinkedList<>(this.metrics.values()));
     }
 }
diff --git 
a/clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java 
b/clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java
index 3f7551e..a1c0814 100644
--- a/clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java
@@ -16,23 +16,25 @@
  */
 package org.apache.kafka.common.metrics;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.metrics.stats.Avg;
 import org.apache.kafka.common.metrics.stats.Meter;
+import org.apache.kafka.common.metrics.stats.Sum;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.common.utils.Time;
 import org.junit.Test;
 
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 public class SensorTest {
     @Test
     public void testRecordLevelEnum() {
@@ -94,4 +96,32 @@ public class SensorTest {
 
         metrics.close();
     }
+
+    @Test
+    public void testIdempotentAdd() {
+        final Metrics metrics = new Metrics();
+        final Sensor sensor = metrics.sensor("sensor");
+
+        assertTrue(sensor.add(metrics.metricName("test-metric", "test-group"), 
new Avg()));
+
+        // adding the same metric to the same sensor is a no-op
+        assertTrue(sensor.add(metrics.metricName("test-metric", "test-group"), 
new Avg()));
+
+
+        // but adding the same metric to a DIFFERENT sensor is an error
+        final Sensor anotherSensor = metrics.sensor("another-sensor");
+        try {
+            anotherSensor.add(metrics.metricName("test-metric", "test-group"), 
new Avg());
+            fail("should have thrown");
+        } catch (final IllegalArgumentException ignored) {
+            // pass
+        }
+
+        // note that adding a different metric with the same name is also a 
no-op
+        assertTrue(sensor.add(metrics.metricName("test-metric", "test-group"), 
new Sum()));
+
+        // so after all this, we still just have the original metric registered
+        assertEquals(1, sensor.metrics().size());
+        assertEquals(org.apache.kafka.common.metrics.stats.Avg.class, 
sensor.metrics().get(0).measurable().getClass());
+    }
 }
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectMetricsTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectMetricsTest.java
index 60bd863..25395f9 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectMetricsTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectMetricsTest.java
@@ -20,11 +20,10 @@ import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.stats.Avg;
 import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
 import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroupId;
-import org.apache.kafka.common.utils.MockTime;
 import org.junit.After;
-import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -145,21 +144,21 @@ public class ConnectMetricsTest {
 
     @Test
     public void testRecreateWithClose() {
-        int numMetrics = addToGroup(metrics, false);
-        int numMetricsInRecreatedGroup = addToGroup(metrics, true);
-        Assert.assertEquals(numMetrics, numMetricsInRecreatedGroup);
+        final Sensor originalSensor = addToGroup(metrics, false);
+        final Sensor recreatedSensor = addToGroup(metrics, true);
+        // because we closed the metricGroup, we get a brand-new sensor
+        assertNotSame(originalSensor, recreatedSensor);
     }
 
-    @Test(expected = IllegalArgumentException.class)
+    @Test
     public void testRecreateWithoutClose() {
-        int numMetrics = addToGroup(metrics, false);
-        int numMetricsInRecreatedGroup = addToGroup(metrics, false);
-        // we should never get here
-        throw new RuntimeException("Created " + numMetricsInRecreatedGroup
-                + " metrics in recreated group. Original=" + numMetrics);
+        final Sensor originalSensor = addToGroup(metrics, false);
+        final Sensor recreatedSensor = addToGroup(metrics, false);
+        // since we didn't close the group, the second addToGroup is idempotent
+        assertSame(originalSensor, recreatedSensor);
     }
 
-    private int addToGroup(ConnectMetrics connectMetrics, boolean shouldClose) 
{
+    private Sensor addToGroup(ConnectMetrics connectMetrics, boolean 
shouldClose) {
         ConnectMetricsRegistry registry = connectMetrics.registry();
         ConnectMetrics.MetricGroup metricGroup = 
connectMetrics.group(registry.taskGroupName(),
                 registry.connectorTagName(), "conn_name");
@@ -172,7 +171,7 @@ public class ConnectMetricsTest {
         sensor.add(metricName("x1"), new Max());
         sensor.add(metricName("y2"), new Avg());
 
-        return metricGroup.metrics().metrics().size();
+        return sensor;
     }
 
     static MetricName metricName(String name) {

-- 
To stop receiving notification emails like this one, please contact
guozh...@apache.org.

Reply via email to