[
https://issues.apache.org/jira/browse/KAFKA-6765?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16440871#comment-16440871
]
ASF GitHub Bot commented on KAFKA-6765:
---
rajinisivaram closed pull request #4869: KAFKA-6765: Handle exception while
reading throttle metric value in test
URL: https://github.com/apache/kafka/pull/4869
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
index ea18cd3fef8..dee69f5a345 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
@@ -487,7 +487,8 @@ public void addMetric(MetricName metricName, MetricConfig
config, Measurable mea
/**
* Add a metric to monitor an object that implements MetricValueProvider.
This metric won't be associated with any
- * sensor. This is a way to expose existing values as metrics.
+ * sensor. This is a way to expose existing values as metrics. User is
expected to add any additional
+ * synchronization to update and access metric values, if required.
*
* @param metricName The name of the metric
* @param metricValueProvider The metric value provider associated with
this metric
@@ -503,7 +504,8 @@ public void addMetric(MetricName metricName, MetricConfig
config, MetricValuePro
/**
* Add a metric to monitor an object that implements MetricValueProvider.
This metric won't be associated with any
- * sensor. This is a way to expose existing values as metrics.
+ * sensor. This is a way to expose existing values as metrics. User is
expected to add any additional
+ * synchronization to update and access metric values, if required.
*
* @param metricName The name of the metric
* @param metricValueProvider The metric value provider associated with
this metric
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
index 22f273d9d93..e4bf1aeee69 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
@@ -229,7 +229,7 @@ public synchronized boolean add(CompoundStat stat,
MetricConfig config) {
return false;
this.stats.add(Utils.notNull(stat));
-Object lock = new Object();
+Object lock = metricLock(stat);
for (NamedMeasurable m : stat.stats()) {
final KafkaMetric metric = new KafkaMetric(lock, m.name(),
m.stat(), config == null ? this.config : config, time);
if (!metrics.containsKey(metric.metricName())) {
@@ -265,7 +265,7 @@ public synchronized boolean add(final MetricName
metricName, final MeasurableSta
return true;
} else {
final KafkaMetric metric = new KafkaMetric(
-new Object(),
+metricLock(stat),
Utils.notNull(metricName),
Utils.notNull(stat),
config == null ? this.config : config,
@@ -289,4 +289,12 @@ public boolean hasExpired() {
synchronized List metrics() {
return Collections.unmodifiableList(new
LinkedList<>(this.metrics.values()));
}
+
+/**
+ * KafkaMetrics of sensors which use SampledStat should be synchronized on
the Sensor object
+ * to allow concurrent reads and updates. For simplicity, all sensors are
synchronized on Sensor.
+ */
+private Object metricLock(Stat stat) {
+return this;
+}
}
diff --git
a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
index 55f8e2349bf..6acc39d35a6 100644
--- a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
@@ -24,9 +24,16 @@
import static org.junit.Assert.fail;
import java.util.Arrays;
+import java.util.Collections;
+import java.util.Deque;
import java.util.HashMap;
import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
@@ -39,8 +46,10 @@
import org.apache.kafka.common.metrics.stats.Percentiles;
import org.apache.kafka.common.metrics.stat