This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 76e089142b1 KAFKA-16486: Integrate KIP-1019 measurability changes (KIP-714) (#15682) 76e089142b1 is described below commit 76e089142b14391f6381b684b27f50158c07be4b Author: Apoorv Mittal <amit...@confluent.io> AuthorDate: Fri Apr 19 19:53:12 2024 +0100 KAFKA-16486: Integrate KIP-1019 measurability changes (KIP-714) (#15682) The PR leverages the changes defined in KIP-1019. Does the cleanup for accessing KafkaMetric field by reflection and uses method exposed by KIP-1019 for metric measurability. Reviewers: Andrew Schofield <aschofi...@confluent.io>, Matthias J. Sax <matth...@confluent.io> --- .../telemetry/internals/KafkaMetricsCollector.java | 30 +--------------------- 1 file changed, 1 insertion(+), 29 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/KafkaMetricsCollector.java b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/KafkaMetricsCollector.java index a888e8997ab..e18a8d54ea1 100644 --- a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/KafkaMetricsCollector.java +++ b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/KafkaMetricsCollector.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.common.telemetry.internals; -import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.metrics.Gauge; import org.apache.kafka.common.metrics.KafkaMetric; @@ -38,7 +37,6 @@ import org.apache.kafka.common.utils.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.lang.reflect.Field; import java.time.Instant; import java.util.List; import java.util.Map; @@ -131,17 +129,6 @@ public class KafkaMetricsCollector implements MetricsCollector { private final MetricNamingStrategy<MetricName> metricNamingStrategy; private final Set<String> excludeLabels; - private static final Field METRIC_VALUE_PROVIDER_FIELD; - - static { - try { - METRIC_VALUE_PROVIDER_FIELD = KafkaMetric.class.getDeclaredField("metricValueProvider"); - METRIC_VALUE_PROVIDER_FIELD.setAccessible(true); - } catch (Exception e) { - throw new KafkaException(e); - } - } - public KafkaMetricsCollector(MetricNamingStrategy<MetricName> metricNamingStrategy, Set<String> excludeLabels) { this(metricNamingStrategy, Time.SYSTEM, excludeLabels); } @@ -211,7 +198,7 @@ public class KafkaMetricsCollector implements MetricsCollector { } Instant now = Instant.ofEpochMilli(time.milliseconds()); - if (isMeasurable(metric)) { + if (metric.isMeasurable()) { Measurable measurable = metric.measurable(); Double value = (Double) metricValue; @@ -262,21 +249,6 @@ public class KafkaMetricsCollector implements MetricsCollector { ); } - private static boolean isMeasurable(KafkaMetric metric) { - // KafkaMetric does not expose the internal MetricValueProvider and throws an IllegalStateException - // exception, if measurable() is called for a Gauge. - // There are 2 ways to find the type of internal MetricValueProvider for a KafkaMetric - use reflection or - // get the information based on whether a IllegalStateException exception is thrown. - // We use reflection so that we can avoid the cost of generating the stack trace when it's - // not a measurable. - try { - Object provider = METRIC_VALUE_PROVIDER_FIELD.get(metric); - return provider instanceof Measurable; - } catch (Exception e) { - throw new KafkaException(e); - } - } - /** * Keeps track of the state of metrics, e.g. when they were added, what their getAndSet value is, * and clearing them out when they're removed.