[ https://issues.apache.org/jira/browse/KAFKA-6123?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16637198#comment-16637198 ]
ASF GitHub Bot commented on KAFKA-6123: --------------------------------------- cmccabe closed pull request #5383: KAFKA-6123: Give client MetricsReporter auto-generated client.id URL: https://github.com/apache/kafka/pull/5383 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 7abe7efd15b..ceebc58301c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -337,7 +337,8 @@ static KafkaAdminClient createInternal(AdminClientConfig config, TimeoutProcesso config.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG), config.getLong(AdminClientConfig.METADATA_MAX_AGE_CONFIG)); List<MetricsReporter> reporters = config.getConfiguredInstances(AdminClientConfig.METRIC_REPORTER_CLASSES_CONFIG, - MetricsReporter.class); + MetricsReporter.class, + Collections.singletonMap(AdminClientConfig.CLIENT_ID_CONFIG, clientId)); Map<String, String> metricTags = Collections.singletonMap("client-id", clientId); MetricConfig metricConfig = new MetricConfig().samples(config.getInt(AdminClientConfig.METRICS_NUM_SAMPLES_CONFIG)) .timeWindow(config.getLong(AdminClientConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 4cdc4f862ec..04b8ec21591 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -677,7 +677,8 @@ private KafkaConsumer(ConsumerConfig config, .recordLevel(Sensor.RecordingLevel.forName(config.getString(ConsumerConfig.METRICS_RECORDING_LEVEL_CONFIG))) .tags(metricsTags); List<MetricsReporter> reporters = config.getConfiguredInstances(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, - MetricsReporter.class); + MetricsReporter.class, + Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId)); reporters.add(new JmxReporter(JMX_PREFIX)); this.metrics = new Metrics(metricConfig, reporters, time); this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); @@ -2211,4 +2212,8 @@ private void throwIfNoAssignorsConfigured() { ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG + " configuration property"); } + // Visible for testing + String getClientId() { + return clientId; + } } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index b3f76eee49f..e249c12d5cc 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -352,7 +352,8 @@ public KafkaProducer(Properties properties, Serializer<K> keySerializer, Seriali .recordLevel(Sensor.RecordingLevel.forName(config.getString(ProducerConfig.METRICS_RECORDING_LEVEL_CONFIG))) .tags(metricTags); List<MetricsReporter> reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, - MetricsReporter.class); + MetricsReporter.class, + Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)); reporters.add(new JmxReporter(JMX_PREFIX)); this.metrics = new Metrics(metricConfig, reporters, time); ProducerMetrics metricsRegistry = new ProducerMetrics(this.metrics); @@ -1202,6 +1203,11 @@ private void throwIfNoTransactionManager() { "by setting the " + ProducerConfig.TRANSACTIONAL_ID_CONFIG + " configuration property"); } + // Visible for testing + String getClientId() { + return clientId; + } + private static class ClusterAndWaitTime { final Cluster cluster; final long waitedOnMetadataMs; diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index 8b8cea6cce1..2a3cbe0b72d 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -137,6 +137,20 @@ @Rule public ExpectedException expectedException = ExpectedException.none(); + @Test + public void testMetricsReporterAutoGeneratedClientId() { + Properties props = new Properties(); + props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); + props.setProperty(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName()); + KafkaConsumer<String, String> consumer = new KafkaConsumer<>( + props, new StringDeserializer(), new StringDeserializer()); + + MockMetricsReporter mockMetricsReporter = (MockMetricsReporter) consumer.metrics.reporters().get(0); + + Assert.assertEquals(consumer.getClientId(), mockMetricsReporter.clientId); + consumer.close(); + } + @Test public void testConstructorClose() { Properties props = new Properties(); diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java index 676aafdc57d..dc6fe9f52fc 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java @@ -76,6 +76,20 @@ @PowerMockIgnore("javax.management.*") public class KafkaProducerTest { + @Test + public void testMetricsReporterAutoGeneratedClientId() { + Properties props = new Properties(); + props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); + props.setProperty(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName()); + KafkaProducer<String, String> producer = new KafkaProducer<>( + props, new StringSerializer(), new StringSerializer()); + + MockMetricsReporter mockMetricsReporter = (MockMetricsReporter) producer.metrics.reporters().get(0); + + Assert.assertEquals(producer.getClientId(), mockMetricsReporter.clientId); + producer.close(); + } + @Test public void testConstructorWithSerializers() { Properties producerProps = new Properties(); diff --git a/clients/src/test/java/org/apache/kafka/test/MockMetricsReporter.java b/clients/src/test/java/org/apache/kafka/test/MockMetricsReporter.java index b5f3855034f..40521f583dd 100644 --- a/clients/src/test/java/org/apache/kafka/test/MockMetricsReporter.java +++ b/clients/src/test/java/org/apache/kafka/test/MockMetricsReporter.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.test; +import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.common.metrics.KafkaMetric; import org.apache.kafka.common.metrics.MetricsReporter; @@ -26,6 +27,7 @@ public class MockMetricsReporter implements MetricsReporter { public static final AtomicInteger INIT_COUNT = new AtomicInteger(0); public static final AtomicInteger CLOSE_COUNT = new AtomicInteger(0); + public String clientId; public MockMetricsReporter() { } @@ -48,5 +50,6 @@ public void close() { @Override public void configure(Map<String, ?> configs) { + clientId = (String) configs.get(CommonClientConfigs.CLIENT_ID_CONFIG); } } \ No newline at end of file diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java index 525ce7e2de9..aeed060d83e 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java @@ -88,7 +88,9 @@ public WorkerGroupMember(DistributedConfig config, MetricConfig metricConfig = new MetricConfig().samples(config.getInt(CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG)) .timeWindow(config.getLong(CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS) .tags(metricsTags); - List<MetricsReporter> reporters = config.getConfiguredInstances(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class); + List<MetricsReporter> reporters = config.getConfiguredInstances(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, + MetricsReporter.class, + Collections.singletonMap(CommonClientConfigs.CLIENT_ID_CONFIG, clientId)); reporters.add(new JmxReporter(JMX_PREFIX)); this.metrics = new Metrics(metricConfig, reporters, time); this.retryBackoffMs = config.getLong(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG); diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index 82323d9081d..da1a1c4f4f2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -659,7 +659,8 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder, .recordLevel(Sensor.RecordingLevel.forName(config.getString(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG))) .timeWindow(config.getLong(StreamsConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS); final List<MetricsReporter> reporters = config.getConfiguredInstances(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, - MetricsReporter.class); + MetricsReporter.class, + Collections.singletonMap(StreamsConfig.CLIENT_ID_CONFIG, clientId)); reporters.add(new JmxReporter(JMX_PREFIX)); metrics = new Metrics(metricConfig, reporters, time); ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Give client MetricsReporter auto-generated client.id > ---------------------------------------------------- > > Key: KAFKA-6123 > URL: https://issues.apache.org/jira/browse/KAFKA-6123 > Project: Kafka > Issue Type: Bug > Components: clients, metrics > Reporter: Kevin Lu > Assignee: Kevin Lu > Priority: Minor > Labels: clients, metrics, newbie++ > > KAFKA-4756 bugfix resolved the broker's KafkaMetricsReporter missing auto > generated broker ids, but this was not fixed on the client side. > > Metric reporters configured for clients should also be given the > auto-generated client id in the `configure` method. > The interceptors already receive the auto-generated client id. -- This message was sent by Atlassian JIRA (v7.6.3#76005)