This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 5cab11cf52 KAFKA-13846: Adding overloaded metricOrElseCreate method (#12121) 5cab11cf52 is described below commit 5cab11cf525f6c06fcf9eb43f7f95ef33fe1cdbb Author: vamossagar12 <sagarmeansoc...@gmail.com> AuthorDate: Mon Jun 13 23:06:39 2022 +0530 KAFKA-13846: Adding overloaded metricOrElseCreate method (#12121) Reviewers: David Jacot <dja...@confluent.io>, Justine Olshan <jols...@confluent.io>, Guozhang Wang <wangg...@gmail.com> --- .../org/apache/kafka/common/metrics/Metrics.java | 40 +++++++++++++++++-- .../org/apache/kafka/common/metrics/Sensor.java | 10 ++++- .../kafka/connect/runtime/ConnectMetrics.java | 8 +--- .../internals/metrics/StreamsMetricsImplTest.java | 46 ++++++++++++++++++++++ 4 files changed, 92 insertions(+), 12 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java index 52b7794a4c..398819016c 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java @@ -509,7 +509,10 @@ public class Metrics implements Closeable { Objects.requireNonNull(metricValueProvider), config == null ? this.config : config, time); - registerMetric(m); + KafkaMetric existingMetric = registerMetric(m); + if (existingMetric != null) { + throw new IllegalArgumentException("A metric named '" + metricName + "' already exists, can't register another one."); + } } /** @@ -524,6 +527,26 @@ public class Metrics implements Closeable { addMetric(metricName, null, metricValueProvider); } + /** + * Create or get an existing metric to monitor an object that implements MetricValueProvider. + * This metric won't be associated with any sensor. This is a way to expose existing values as metrics. + * This method takes care of synchronisation while updating/accessing metrics by concurrent threads. + * + * @param metricName The name of the metric + * @param metricValueProvider The metric value provider associated with this metric + * @return Existing KafkaMetric if already registered or else a newly created one + */ + public KafkaMetric addMetricIfAbsent(MetricName metricName, MetricConfig config, MetricValueProvider<?> metricValueProvider) { + KafkaMetric metric = new KafkaMetric(new Object(), + Objects.requireNonNull(metricName), + Objects.requireNonNull(metricValueProvider), + config == null ? this.config : config, + time); + + KafkaMetric existingMetric = registerMetric(metric); + return existingMetric == null ? metric : existingMetric; + } + /** * Remove a metric if it exists and return it. Return null otherwise. If a metric is removed, `metricRemoval` * will be invoked for each reporter. @@ -563,10 +586,18 @@ public class Metrics implements Closeable { } } - synchronized void registerMetric(KafkaMetric metric) { + /** + * Register a metric if not present or return an already existing metric otherwise. + * When a metric is newly registered, this method returns null + * + * @param metric The KafkaMetric to register + * @return KafkaMetric if the metric already exists, null otherwise + */ + synchronized KafkaMetric registerMetric(KafkaMetric metric) { MetricName metricName = metric.metricName(); - if (this.metrics.containsKey(metricName)) - throw new IllegalArgumentException("A metric named '" + metricName + "' already exists, can't register another one."); + if (this.metrics.containsKey(metricName)) { + return this.metrics.get(metricName); + } this.metrics.put(metricName, metric); for (MetricsReporter reporter : reporters) { try { @@ -576,6 +607,7 @@ public class Metrics implements Closeable { } } log.trace("Registered metric named {}", metricName); + return null; } /** diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java index 5ae3b8d997..25f3c21a31 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java @@ -297,7 +297,10 @@ public final class Sensor { for (NamedMeasurable m : stat.stats()) { final KafkaMetric metric = new KafkaMetric(lock, m.name(), m.stat(), statConfig, time); if (!metrics.containsKey(metric.metricName())) { - registry.registerMetric(metric); + KafkaMetric existingMetric = registry.registerMetric(metric); + if (existingMetric != null) { + throw new IllegalArgumentException("A metric named '" + metric.metricName() + "' already exists, can't register another one."); + } metrics.put(metric.metricName(), metric); } } @@ -336,7 +339,10 @@ public final class Sensor { statConfig, time ); - registry.registerMetric(metric); + KafkaMetric existingMetric = registry.registerMetric(metric); + if (existingMetric != null) { + throw new IllegalArgumentException("A metric named '" + metricName + "' already exists, can't register another one."); + } metrics.put(metric.metricName(), metric); stats.add(new StatAndConfig(Objects.requireNonNull(stat), metric::config)); return true; diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java index 7dad6aec0a..ed81be657a 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java @@ -319,9 +319,7 @@ public class ConnectMetrics { */ public <T> void addValueMetric(MetricNameTemplate nameTemplate, final LiteralSupplier<T> supplier) { MetricName metricName = metricName(nameTemplate); - if (metrics().metric(metricName) == null) { - metrics().addMetric(metricName, (Gauge<T>) (config, now) -> supplier.metricValue(now)); - } + metrics().addMetricIfAbsent(metricName, null, (Gauge<T>) (config, now) -> supplier.metricValue(now)); } /** @@ -333,9 +331,7 @@ public class ConnectMetrics { */ public <T> void addImmutableValueMetric(MetricNameTemplate nameTemplate, final T value) { MetricName metricName = metricName(nameTemplate); - if (metrics().metric(metricName) == null) { - metrics().addMetric(metricName, (Gauge<T>) (config, now) -> value); - } + metrics().addMetricIfAbsent(metricName, null, (Gauge<T>) (config, now) -> value); } /** diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java index b8d3d92e62..5ec834a11f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java @@ -45,6 +45,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; @@ -84,6 +85,8 @@ import static org.hamcrest.Matchers.greaterThan; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertNotNull; import static org.powermock.api.easymock.PowerMock.createMock; @RunWith(PowerMockRunner.class) @@ -497,6 +500,17 @@ public class StreamsMetricsImplTest { verify(metrics); } + @Test + public void shouldCreateNewStoreLevelMutableMetric() { + final MetricName metricName = + new MetricName(METRIC_NAME1, STATE_STORE_LEVEL_GROUP, DESCRIPTION1, STORE_LEVEL_TAG_MAP); + final MetricConfig metricConfig = new MetricConfig().recordLevel(INFO_RECORDING_LEVEL); + final Metrics metrics = new Metrics(metricConfig); + assertNull(metrics.metric(metricName)); + metrics.addMetricIfAbsent(metricName, metricConfig, VALUE_PROVIDER); + assertNotNull(metrics.metric(metricName)); + } + @Test public void shouldNotAddStoreLevelMutableMetricIfAlreadyExists() { final Metrics metrics = mock(Metrics.class); @@ -521,6 +535,38 @@ public class StreamsMetricsImplTest { verify(metrics); } + @Test + public void shouldReturnSameMetricIfAlreadyCreated() { + final MetricName metricName = + new MetricName(METRIC_NAME1, STATE_STORE_LEVEL_GROUP, DESCRIPTION1, STORE_LEVEL_TAG_MAP); + final MetricConfig metricConfig = new MetricConfig().recordLevel(INFO_RECORDING_LEVEL); + final Metrics metrics = new Metrics(metricConfig); + assertNull(metrics.metric(metricName)); + final KafkaMetric kafkaMetric = metrics.addMetricIfAbsent(metricName, metricConfig, VALUE_PROVIDER); + assertEquals(kafkaMetric, metrics.addMetricIfAbsent(metricName, metricConfig, VALUE_PROVIDER)); + } + + @Test + public void shouldCreateMetricOnceDuringConcurrentMetricCreationRequest() throws InterruptedException { + final MetricName metricName = + new MetricName(METRIC_NAME1, STATE_STORE_LEVEL_GROUP, DESCRIPTION1, STORE_LEVEL_TAG_MAP); + final MetricConfig metricConfig = new MetricConfig().recordLevel(INFO_RECORDING_LEVEL); + final Metrics metrics = new Metrics(metricConfig); + assertNull(metrics.metric(metricName)); + final AtomicReference<KafkaMetric> metricCreatedViaThread1 = new AtomicReference<>(); + final AtomicReference<KafkaMetric> metricCreatedViaThread2 = new AtomicReference<>(); + + final Thread thread1 = new Thread(() -> metricCreatedViaThread1.set(metrics.addMetricIfAbsent(metricName, metricConfig, VALUE_PROVIDER))); + final Thread thread2 = new Thread(() -> metricCreatedViaThread2.set(metrics.addMetricIfAbsent(metricName, metricConfig, VALUE_PROVIDER))); + + thread1.start(); + thread2.start(); + + thread1.join(); + thread2.join(); + assertEquals(metricCreatedViaThread1.get(), metricCreatedViaThread2.get()); + } + @Test public void shouldRemoveStateStoreLevelSensors() { final Metrics metrics = niceMock(Metrics.class);