[ https://issues.apache.org/jira/browse/KAFKA-6986?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16522514#comment-16522514 ]
ASF GitHub Bot commented on KAFKA-6986: --------------------------------------- guozhangwang closed pull request #5210: KAFKA-6986: Export Admin Client metrics through Stream Threads URL: https://github.com/apache/kafka/pull/5210 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java index 0171b617e7c..75c93b63228 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java @@ -17,6 +17,8 @@ package org.apache.kafka.clients.admin; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartitionReplica; import org.apache.kafka.common.acl.AclBinding; @@ -768,4 +770,11 @@ public ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String groupId) { public DeleteConsumerGroupsResult deleteConsumerGroups(Collection<String> groupIds) { return deleteConsumerGroups(groupIds, new DeleteConsumerGroupsOptions()); } + + /** + * Get the metrics kept by the adminClient + * + * @return + */ + public abstract Map<MetricName, ? extends Metric> metrics(); } diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 495095a9276..016195308b0 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -35,6 +35,8 @@ import org.apache.kafka.common.ConsumerGroupState; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; @@ -2734,4 +2736,9 @@ void handleFailure(Throwable throwable) { return new DeleteConsumerGroupsResult(new HashMap<String, KafkaFuture<Void>>(futures)); } + + @Override + public Map<MetricName, ? extends Metric> metrics() { + return Collections.unmodifiableMap(this.metrics.metrics()); + } } diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java index 2fc7048b759..6fbdca265c1 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java @@ -17,6 +17,8 @@ package org.apache.kafka.clients.admin; import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartitionInfo; @@ -47,6 +49,8 @@ private Node controller; private int timeoutNextRequests = 0; + private Map<MetricName, Metric> mockMetrics = new HashMap<>(); + /** * Creates MockAdminClient for a cluster with the given brokers. The Kafka cluster ID uses the default value from * DEFAULT_CLUSTER_ID. @@ -391,4 +395,10 @@ public void close(long duration, TimeUnit unit) {} } } + public void setMockMetrics(MetricName name, Metric metric) { mockMetrics.put(name, metric); } + + @Override + public Map<MetricName, ? extends Metric> metrics() { + return mockMetrics; + } } diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index 6a707ff986d..cef8116e880 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -382,12 +382,12 @@ public void setGlobalStateRestoreListener(final StateRestoreListener globalState * * @return Map of all metrics. */ - // TODO: we can add metrics for admin client as well public Map<MetricName, ? extends Metric> metrics() { final Map<MetricName, Metric> result = new LinkedHashMap<>(); for (final StreamThread thread : threads) { result.putAll(thread.producerMetrics()); result.putAll(thread.consumerMetrics()); + result.putAll(thread.adminClientMetrics()); } if (globalStreamThread != null) result.putAll(globalStreamThread.consumerMetrics()); result.putAll(metrics.metrics()); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index a159e7b6c7a..fbf7fb60e80 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -1271,4 +1271,11 @@ TaskManager taskManager() { result.putAll(restoreConsumerMetrics); return result; } + + public Map<MetricName, Metric> adminClientMetrics() { + final Map<MetricName, ? extends Metric> adminClientMetrics = taskManager.getAdminClient().metrics(); + final LinkedHashMap<MetricName, Metric> result = new LinkedHashMap<>(); + result.putAll(adminClientMetrics); + return result; + } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index 44db70d8b60..9da27020c5f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -278,6 +278,10 @@ void shutdown(final boolean clean) { } } + AdminClient getAdminClient() { + return adminClient; + } + Set<TaskId> suspendedActiveTaskIds() { return active.previousTaskIds(); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index 3412c629a56..818a0d8e2ee 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.processor.internals; +import org.apache.kafka.clients.admin.MockAdminClient; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -72,6 +73,7 @@ import java.io.File; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -1303,4 +1305,53 @@ public double measure(MetricConfig config, long now) { Map<MetricName, Metric> producerMetrics = thread.producerMetrics(); assertEquals(testMetricName, producerMetrics.get(testMetricName).metricName()); } + + @Test + public void adminClientMetricsVerification() { + final Node broker1 = new Node(0, "dummyHost-1", 1234); + final Node broker2 = new Node(1, "dummyHost-2", 1234); + List<Node> cluster = Arrays.asList(broker1, broker2); + + final MockAdminClient adminClient = new MockAdminClient(cluster, broker1, null); + + final MockProducer<byte[], byte[]> producer = new MockProducer<>(); + final Consumer<byte[], byte[]> consumer = EasyMock.createNiceMock(Consumer.class); + final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class); + + final StreamThread.StreamsMetricsThreadImpl streamsMetrics = new StreamThread.StreamsMetricsThreadImpl(metrics, ""); + final StreamThread thread = new StreamThread( + mockTime, + config, + producer, + consumer, + consumer, + null, + taskManager, + streamsMetrics, + internalTopologyBuilder, + clientId, + new LogContext(""), + new AtomicBoolean()); + final MetricName testMetricName = new MetricName("test_metric", "", "", new HashMap<String, String>()); + final Metric testMetric = new KafkaMetric( + new Object(), + testMetricName, + new Measurable() { + @Override + public double measure(MetricConfig config, long now) { + return 0; + } + }, + null, + new MockTime()); + + + EasyMock.expect(taskManager.getAdminClient()).andReturn(adminClient); + EasyMock.expectLastCall(); + EasyMock.replay(taskManager, consumer); + + adminClient.setMockMetrics(testMetricName, testMetric); + Map<MetricName, Metric> adminClientMetrics = thread.adminClientMetrics(); + assertEquals(testMetricName, adminClientMetrics.get(testMetricName).metricName()); + } } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Export Admin Client metrics through Stream Threads > -------------------------------------------------- > > Key: KAFKA-6986 > URL: https://issues.apache.org/jira/browse/KAFKA-6986 > Project: Kafka > Issue Type: Improvement > Components: streams > Reporter: Boyang Chen > Assignee: Shun Guan > Priority: Minor > Labels: Newcomer, beginner, newbie > > We already exported producer and consumer metrics through KafkaStreams class: > [https://github.com/apache/kafka/pull/4998] > It makes sense to also export the Admin client metrics. > If any new contributor wishes to take over this one, please let me know. I > will revisit and close this ticket in one or two months later in case no one > claims it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)