mjsax commented on code in PR #17021:
URL: https://github.com/apache/kafka/pull/17021#discussion_r1755880241
##########
clients/src/main/java/org/apache/kafka/clients/admin/Admin.java:
##########
@@ -1823,6 +1824,35 @@ default ListShareGroupsResult listShareGroups() {
return listShareGroups(new ListShareGroupsOptions());
}
+
+ /**
+ * An application metric provided for subscription.
+ * This metric will be added to this client's metrics
+ * that are available for subscription and sent as
+ * telemetry data to the broker.
+ * The provided metric must map to an OTLP metric data point
+ * type in the OpenTelemetry v1 metrics protobuf message types.
+ * Specifically, the metric should be one of the following:
+ * - `Sum`: Monotonic total count meter (Counter). Suitable for metrics
like total number of X, e.g., total bytes sent.
+ * - `Gauge`: Non-monotonic current value meter (UpDownCounter). Suitable
for metrics like current value of Y, e.g., current queue count.
+ * Metrics not matching these types are silently ignored.
+ * Executing this method for a previously registered metric is a benign
operation and results in updating that metrics entry.
Review Comment:
> benign
I just learned a new word :)
##########
clients/src/main/java/org/apache/kafka/clients/admin/Admin.java:
##########
@@ -1823,6 +1824,35 @@ default ListShareGroupsResult listShareGroups() {
return listShareGroups(new ListShareGroupsOptions());
}
+
+ /**
+ * An application metric provided for subscription.
+ * This metric will be added to this client's metrics
+ * that are available for subscription and sent as
+ * telemetry data to the broker.
+ * The provided metric must map to an OTLP metric data point
+ * type in the OpenTelemetry v1 metrics protobuf message types.
+ * Specifically, the metric should be one of the following:
+ * - `Sum`: Monotonic total count meter (Counter). Suitable for metrics
like total number of X, e.g., total bytes sent.
Review Comment:
We need to us html markup here to get a proper bullet point list -- it won't
render as bullet points as-is.
##########
clients/src/main/java/org/apache/kafka/clients/admin/Admin.java:
##########
@@ -1823,6 +1824,35 @@ default ListShareGroupsResult listShareGroups() {
return listShareGroups(new ListShareGroupsOptions());
}
+
+ /**
+ * An application metric provided for subscription.
+ * This metric will be added to this client's metrics
+ * that are available for subscription and sent as
+ * telemetry data to the broker.
+ * The provided metric must map to an OTLP metric data point
+ * type in the OpenTelemetry v1 metrics protobuf message types.
+ * Specifically, the metric should be one of the following:
+ * - `Sum`: Monotonic total count meter (Counter). Suitable for metrics
like total number of X, e.g., total bytes sent.
+ * - `Gauge`: Non-monotonic current value meter (UpDownCounter). Suitable
for metrics like current value of Y, e.g., current queue count.
+ * Metrics not matching these types are silently ignored.
+ * Executing this method for a previously registered metric is a benign
operation and results in updating that metrics entry.
+ *
+ * @param metric The application metric to register
+ */
+ void registerMetricForSubscription(KafkaMetric metric);
+
+ /**
+ * An application to be removed from subscription.
Review Comment:
Similar to `register` method (also `application [metric]`)
##########
streams/src/main/java/org/apache/kafka/streams/internals/metrics/StreamsThreadMetricsDelegatingReporter.java:
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.internals.metrics;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricsReporter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+public class StreamsThreadMetricsDelegatingReporter implements MetricsReporter
{
+
+ private static final Logger log =
LoggerFactory.getLogger(StreamsThreadMetricsDelegatingReporter.class);
+ private static final String THREAD_ID_TAG = "thread-id";
+ private final Consumer<byte[], byte[]> consumer;
+ private final String threadId;
+ private final String stateUpdaterThreadId;
+
+
+ public StreamsThreadMetricsDelegatingReporter(final Consumer<byte[],
byte[]> consumer, final String threadId, final String stateUpdaterThreadId) {
+ this.consumer = Objects.requireNonNull(consumer);
+ this.threadId = Objects.requireNonNull(threadId);
+ this.stateUpdaterThreadId =
Objects.requireNonNull(stateUpdaterThreadId);
+ log.debug("Creating MetricsReporter for threadId {} and stateUpdaterId
{}", threadId, stateUpdaterThreadId);
+ }
+
+ @Override
+ public void init(final List<KafkaMetric> metrics) {
+ metrics.forEach(this::metricChange);
+ }
+
+ @Override
+ public void metricChange(final KafkaMetric metric) {
+ if (tagMatchStreamOrStateUpdaterThreadId(metric)) {
+ log.debug("Registering metric {}", metric.metricName());
+ consumer.registerMetricForSubscription(metric);
+ }
+ }
+
+ boolean tagMatchStreamOrStateUpdaterThreadId(final KafkaMetric metric) {
Review Comment:
`private` ?
##########
streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java:
##########
@@ -0,0 +1,474 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.Measurable;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
+import org.apache.kafka.server.telemetry.ClientTelemetry;
+import org.apache.kafka.server.telemetry.ClientTelemetryPayload;
+import org.apache.kafka.server.telemetry.ClientTelemetryReceiver;
+import org.apache.kafka.streams.KafkaClientSupplier;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Properties;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkObjectProperties;
+import static
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.apache.kafka.test.TestUtils.waitForCondition;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@Timeout(600)
+@Tag("integration")
+public class KafkaStreamsTelemetryIntegrationTest {
+
+ private static EmbeddedKafkaCluster cluster;
+ private String appId;
+ private String inputTopicTwoPartitions;
+ private String outputTopicTwoPartitions;
+ private String inputTopicOnePartition;
+ private String outputTopicOnePartition;
+ private final List<Properties> streamsConfigurations = new ArrayList<>();
+ private static final List<MetricsInterceptingConsumer<byte[], byte[]>>
INTERCEPTING_CONSUMERS = new ArrayList<>();
+ private static final List<TestingMetricsInterceptingAdminClient>
INTERCEPTING_ADMIN_CLIENTS = new ArrayList<>();
+ private static final int NUM_BROKERS = 1;
+ private static final int FIRST_INSTANCE_CONSUMER = 0;
+ private static final int SECOND_INSTANCE_CONSUMER = 1;
+
+ @BeforeAll
+ public static void startCluster() throws IOException {
+ final Properties properties = new Properties();
+ properties.put("metric.reporters",
TestingClientTelemetry.class.getName());
+ cluster = new EmbeddedKafkaCluster(NUM_BROKERS, properties);
+ cluster.start();
+ }
+
+ @BeforeEach
+ public void setUp(final TestInfo testInfo) throws InterruptedException {
+ appId = safeUniqueTestName(testInfo);
+ inputTopicTwoPartitions = appId + "-input-two";
+ outputTopicTwoPartitions = appId + "-output-two";
+ inputTopicOnePartition = appId + "-input-one";
+ outputTopicOnePartition = appId + "-output-one";
+ cluster.createTopic(inputTopicTwoPartitions, 2, 1);
+ cluster.createTopic(outputTopicTwoPartitions, 2, 1);
+ cluster.createTopic(inputTopicOnePartition, 1, 1);
+ cluster.createTopic(outputTopicOnePartition, 1, 1);
+ }
+
+ @AfterAll
+ public static void closeCluster() {
+ cluster.stop();
+ }
+
+ @AfterEach
+ public void tearDown() throws Exception {
+ INTERCEPTING_CONSUMERS.clear();
+ INTERCEPTING_ADMIN_CLIENTS.clear();
+ IntegrationTestUtils.purgeLocalStreamsState(streamsConfigurations);
+ streamsConfigurations.clear();
+ }
+
+ @Test
+ @DisplayName("Calling unregisterMetric on metrics not registered should
not cause an error")
+ public void shouldNotThrowExceptionWhenRemovingNonExistingMetrics() throws
InterruptedException {
Review Comment:
Do we need to test this with a heavy-weight integration test? Or could we
unit-test this in a more light weight fashion?
##########
streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java:
##########
@@ -0,0 +1,474 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.Measurable;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
+import org.apache.kafka.server.telemetry.ClientTelemetry;
+import org.apache.kafka.server.telemetry.ClientTelemetryPayload;
+import org.apache.kafka.server.telemetry.ClientTelemetryReceiver;
+import org.apache.kafka.streams.KafkaClientSupplier;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Properties;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkObjectProperties;
+import static
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.apache.kafka.test.TestUtils.waitForCondition;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@Timeout(600)
+@Tag("integration")
+public class KafkaStreamsTelemetryIntegrationTest {
+
+ private static EmbeddedKafkaCluster cluster;
+ private String appId;
+ private String inputTopicTwoPartitions;
+ private String outputTopicTwoPartitions;
+ private String inputTopicOnePartition;
+ private String outputTopicOnePartition;
+ private final List<Properties> streamsConfigurations = new ArrayList<>();
+ private static final List<MetricsInterceptingConsumer<byte[], byte[]>>
INTERCEPTING_CONSUMERS = new ArrayList<>();
+ private static final List<TestingMetricsInterceptingAdminClient>
INTERCEPTING_ADMIN_CLIENTS = new ArrayList<>();
+ private static final int NUM_BROKERS = 1;
+ private static final int FIRST_INSTANCE_CONSUMER = 0;
+ private static final int SECOND_INSTANCE_CONSUMER = 1;
+
+ @BeforeAll
+ public static void startCluster() throws IOException {
+ final Properties properties = new Properties();
+ properties.put("metric.reporters",
TestingClientTelemetry.class.getName());
+ cluster = new EmbeddedKafkaCluster(NUM_BROKERS, properties);
+ cluster.start();
+ }
+
+ @BeforeEach
+ public void setUp(final TestInfo testInfo) throws InterruptedException {
+ appId = safeUniqueTestName(testInfo);
+ inputTopicTwoPartitions = appId + "-input-two";
+ outputTopicTwoPartitions = appId + "-output-two";
+ inputTopicOnePartition = appId + "-input-one";
+ outputTopicOnePartition = appId + "-output-one";
+ cluster.createTopic(inputTopicTwoPartitions, 2, 1);
+ cluster.createTopic(outputTopicTwoPartitions, 2, 1);
+ cluster.createTopic(inputTopicOnePartition, 1, 1);
+ cluster.createTopic(outputTopicOnePartition, 1, 1);
+ }
+
+ @AfterAll
+ public static void closeCluster() {
+ cluster.stop();
+ }
+
+ @AfterEach
+ public void tearDown() throws Exception {
+ INTERCEPTING_CONSUMERS.clear();
+ INTERCEPTING_ADMIN_CLIENTS.clear();
+ IntegrationTestUtils.purgeLocalStreamsState(streamsConfigurations);
+ streamsConfigurations.clear();
+ }
+
+ @Test
+ @DisplayName("Calling unregisterMetric on metrics not registered should
not cause an error")
+ public void shouldNotThrowExceptionWhenRemovingNonExistingMetrics() throws
InterruptedException {
+ final Properties properties = props(true);
+ final Topology topology = complexTopology();
+ try (final KafkaStreams streams = new KafkaStreams(topology,
properties)) {
+ streams.start();
+ waitForCondition(() -> KafkaStreams.State.RUNNING ==
streams.state(),
+ IntegrationTestUtils.DEFAULT_TIMEOUT,
+ () -> "Kafka Streams never transitioned to a RUNNING
state.");
+
+ final Consumer<?, ?> embeddedConsumer =
INTERCEPTING_CONSUMERS.get(FIRST_INSTANCE_CONSUMER);
+ final MetricName metricName = new MetricName("fakeMetric",
"fakeGroup", "It's a fake metric", new HashMap<>());
+ final KafkaMetric nonExitingMetric = new KafkaMetric(new Object(),
metricName, (Measurable) (m, now) -> 1.0, new MetricConfig(), Time.SYSTEM);
+ assertDoesNotThrow(() ->
embeddedConsumer.unregisterMetricFromSubscription(nonExitingMetric));
+ }
+ }
+
+ @ParameterizedTest
+ @MethodSource("singleAndMultiTaskParameters")
+ @DisplayName("Streams metrics should get passed to Consumer")
+ public void shouldPassMetrics(final String topologyType, final boolean
stateUpdaterEnabled) throws InterruptedException {
+ final Properties properties = props(stateUpdaterEnabled);
+ final Topology topology = topologyType.equals("simple") ?
simpleTopology() : complexTopology();
+ /*
+ This test verifies that all Kafka Streams metrics with a thread-id
tag get passed to the consumer
+ */
+ try (final KafkaStreams streams = new KafkaStreams(topology,
properties)) {
+ streams.start();
+ waitForCondition(() -> KafkaStreams.State.RUNNING ==
streams.state(),
+ IntegrationTestUtils.DEFAULT_TIMEOUT,
+ () -> "Kafka Streams never transitioned to a RUNNING
state.");
+
+ final List<MetricName> streamsThreadMetrics =
streams.metrics().values().stream().map(Metric::metricName)
+ .filter(metricName ->
metricName.tags().containsKey("thread-id")).collect(Collectors.toList());
+
+ final List<MetricName> streamsClientMetrics =
streams.metrics().values().stream().map(Metric::metricName)
+ .filter(metricName ->
metricName.group().equals("stream-metrics")).collect(Collectors.toList());
+
+
+
+ final List<MetricName> consumerPassedStreamThreadMetricNames =
INTERCEPTING_CONSUMERS.get(FIRST_INSTANCE_CONSUMER).passedMetrics.stream().map(KafkaMetric::metricName).collect(Collectors.toList());
+ final List<MetricName> adminPassedStreamClientMetricNames =
INTERCEPTING_ADMIN_CLIENTS.get(FIRST_INSTANCE_CONSUMER).passedMetrics.stream().map(KafkaMetric::metricName).collect(Collectors.toList());
+
+
+ assertEquals(streamsThreadMetrics.size(),
consumerPassedStreamThreadMetricNames.size());
+ consumerPassedStreamThreadMetricNames.forEach(metricName ->
assertTrue(streamsThreadMetrics.contains(metricName), "Streams metrics doesn't
contain " + metricName));
+
+ assertEquals(streamsClientMetrics.size(),
adminPassedStreamClientMetricNames.size());
+ adminPassedStreamClientMetricNames.forEach(metricName ->
assertTrue(streamsClientMetrics.contains(metricName), "Client metrics doesn't
contain " + metricName));
+ }
+ }
+
+ @ParameterizedTest
+ @MethodSource("multiTaskParameters")
+ @DisplayName("Correct streams metrics should get passed with dynamic
membership")
+ public void shouldPassCorrectMetricsDynamicInstances(final boolean
stateUpdaterEnabled) throws InterruptedException {
+ final Properties properties1 = props(stateUpdaterEnabled);
+ properties1.put(StreamsConfig.STATE_DIR_CONFIG,
TestUtils.tempDirectory(appId).getPath() + "-ks1");
+ properties1.put(StreamsConfig.CLIENT_ID_CONFIG, appId + "-ks1");
+
+
+ final Properties properties2 = props(stateUpdaterEnabled);
+ properties2.put(StreamsConfig.STATE_DIR_CONFIG,
TestUtils.tempDirectory(appId).getPath() + "-ks2");
+ properties2.put(StreamsConfig.CLIENT_ID_CONFIG, appId + "-ks2");
+
+ /*
+ This test ensures metrics are registered and removed correctly with
Kafka Steams dynamic membership changes
+ */
+
+ final Topology topology = complexTopology();
+ try (final KafkaStreams streamsOne = new KafkaStreams(topology,
properties1)) {
+ streamsOne.start();
+ waitForCondition(() -> KafkaStreams.State.RUNNING ==
streamsOne.state(),
+ IntegrationTestUtils.DEFAULT_TIMEOUT,
+ () -> "Kafka Streams never transitioned to a RUNNING
state.");
+
+ final List<MetricName> streamsTaskMetricNames =
streamsOne.metrics().values().stream().map(Metric::metricName)
+ .filter(metricName ->
metricName.tags().containsKey("task-id")).collect(Collectors.toList());
+
+ final List<MetricName> consumerPassedStreamTaskMetricNames =
INTERCEPTING_CONSUMERS.get(FIRST_INSTANCE_CONSUMER).passedMetrics.stream().map(KafkaMetric::metricName)
+ .filter(metricName ->
metricName.tags().containsKey("task-id")).collect(Collectors.toList());
+
+ /*
+ With only one instance, Kafka Streams should register task
metrics for all tasks 0_0, 0_1, 1_0, 1_1
+ */
+ final List<String> streamTaskIds = getTaskIdsAsStrings(streamsOne);
+ final long consumerPassedTaskMetricCount =
consumerPassedStreamTaskMetricNames.stream().filter(metricName ->
streamTaskIds.contains(metricName.tags().get("task-id"))).count();
+ assertEquals(streamsTaskMetricNames.size(),
consumerPassedStreamTaskMetricNames.size());
+ assertEquals(consumerPassedTaskMetricCount,
streamsTaskMetricNames.size());
+
+
+ try (final KafkaStreams streamsTwo = new KafkaStreams(topology,
properties2)) {
+ streamsTwo.start();
+ waitForCondition(() -> KafkaStreams.State.RUNNING ==
streamsTwo.state() && KafkaStreams.State.RUNNING == streamsOne.state(),
+ IntegrationTestUtils.DEFAULT_TIMEOUT,
+ () -> "Kafka Streams one or two never transitioned to
a RUNNING state.");
+
+ /*
+ Now with 2 instances, the tasks will get split amongst both
Kafka Streams applications
+ */
+ final List<String> streamOneTaskIds =
getTaskIdsAsStrings(streamsOne);
+ final List<String> streamTwoTasksIds =
getTaskIdsAsStrings(streamsTwo);
+
+ final List<MetricName> streamsOneTaskMetrics =
streamsOne.metrics().values().stream().map(Metric::metricName)
+ .filter(metricName ->
metricName.tags().containsKey("task-id")).collect(Collectors.toList());
+ final List<MetricName> streamsOneStateMetrics =
streamsOne.metrics().values().stream().map(Metric::metricName)
+ .filter(metricName ->
metricName.group().equals("stream-state-metrics")).collect(Collectors.toList());
+
+ final List<MetricName> consumerOnePassedTaskMetrics =
INTERCEPTING_CONSUMERS.get(FIRST_INSTANCE_CONSUMER)
+
.passedMetrics.stream().map(KafkaMetric::metricName).filter(metricName ->
metricName.tags().containsKey("task-id")).collect(Collectors.toList());
+ final List<MetricName> consumerOnePassedStateMetrics =
INTERCEPTING_CONSUMERS.get(FIRST_INSTANCE_CONSUMER)
+
.passedMetrics.stream().map(KafkaMetric::metricName).filter(metricName ->
metricName.group().equals("stream-state-metrics")).collect(Collectors.toList());
+
+ final List<MetricName> streamsTwoTaskMetrics =
streamsTwo.metrics().values().stream().map(Metric::metricName)
+ .filter(metricName ->
metricName.tags().containsKey("task-id")).collect(Collectors.toList());
+ final List<MetricName> streamsTwoStateMetrics =
streamsTwo.metrics().values().stream().map(Metric::metricName)
+ .filter(metricName ->
metricName.group().equals("stream-state-metrics")).collect(Collectors.toList());
+
+ final List<MetricName> consumerTwoPassedTaskMetrics =
INTERCEPTING_CONSUMERS.get(SECOND_INSTANCE_CONSUMER)
+
.passedMetrics.stream().map(KafkaMetric::metricName).filter(metricName ->
metricName.tags().containsKey("task-id")).collect(Collectors.toList());
+ final List<MetricName> consumerTwoPassedStateMetrics =
INTERCEPTING_CONSUMERS.get(SECOND_INSTANCE_CONSUMER)
+
.passedMetrics.stream().map(KafkaMetric::metricName).filter(metricName ->
metricName.group().equals("stream-state-metrics")).collect(Collectors.toList());
+ /*
+ Confirm pre-existing KafkaStreams instance one only passes
metrics for its tasks and has no metrics for previous tasks
+ */
+ final long consumerOneStreamOneTaskCount =
consumerOnePassedTaskMetrics.stream().filter(metricName ->
streamOneTaskIds.contains(metricName.tags().get("task-id"))).count();
+ final long consumerOneStateMetricCount =
consumerOnePassedStateMetrics.stream().filter(metricName ->
streamOneTaskIds.contains(metricName.tags().get("task-id"))).count();
+ final long consumerOneTaskTwoMetricCount =
consumerOnePassedTaskMetrics.stream().filter(metricName ->
streamTwoTasksIds.contains(metricName.tags().get("task-id"))).count();
+ final long consumerOneStateTwoMetricCount =
consumerOnePassedStateMetrics.stream().filter(metricName ->
streamTwoTasksIds.contains(metricName.tags().get("task-id"))).count();
+
+ /*
+ Confirm new KafkaStreams instance only passes metrics for
the newly assigned tasks
+ */
+ final long consumerTwoStreamTwoTaskCount =
consumerTwoPassedTaskMetrics.stream().filter(metricName ->
streamTwoTasksIds.contains(metricName.tags().get("task-id"))).count();
+ final long consumerTwoStateMetricCount =
consumerTwoPassedStateMetrics.stream().filter(metricName ->
streamTwoTasksIds.contains(metricName.tags().get("task-id"))).count();
+ final long consumerTwoTaskOneMetricCount =
consumerTwoPassedTaskMetrics.stream().filter(metricName ->
streamOneTaskIds.contains(metricName.tags().get("task-id"))).count();
+ final long consumerTwoStateMetricOneCount =
consumerTwoPassedStateMetrics.stream().filter(metricName ->
streamOneTaskIds.contains(metricName.tags().get("task-id"))).count();
+
+ assertEquals(streamsOneTaskMetrics.size(),
consumerOneStreamOneTaskCount);
+ assertEquals(streamsOneStateMetrics.size(),
consumerOneStateMetricCount);
+ assertEquals(0, consumerOneTaskTwoMetricCount);
+ assertEquals(0, consumerOneStateTwoMetricCount);
+
+ assertEquals(streamsTwoTaskMetrics.size(),
consumerTwoStreamTwoTaskCount);
+ assertEquals(streamsTwoStateMetrics.size(),
consumerTwoStateMetricCount);
+ assertEquals(0, consumerTwoTaskOneMetricCount);
+ assertEquals(0, consumerTwoStateMetricOneCount);
+ }
+ }
+ }
+
+ @Test
+ @DisplayName("Streams metrics should not be visible in consumer metrics")
Review Comment:
Sounds unit-testable via `StreamThreadTest` ?
##########
clients/src/main/java/org/apache/kafka/clients/admin/Admin.java:
##########
@@ -1823,6 +1824,35 @@ default ListShareGroupsResult listShareGroups() {
return listShareGroups(new ListShareGroupsOptions());
}
+
+ /**
+ * An application metric provided for subscription.
Review Comment:
Sound more like a description of the parameter than the method?
Maybe:
```
Add the provided application metric for subscription.
```
##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -556,10 +562,13 @@ static KafkaAdminClient createInternal(
time,
1,
(int) TimeUnit.HOURS.toMillis(1),
+ null,
Review Comment:
nit: indention
##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -556,10 +562,13 @@ static KafkaAdminClient createInternal(
time,
1,
(int) TimeUnit.HOURS.toMillis(1),
+ null,
metadataManager.updater(),
- (hostResolver == null) ? new DefaultHostResolver() :
hostResolver);
+ (hostResolver == null) ? new DefaultHostResolver() :
hostResolver,
+ null,
Review Comment:
indention
##########
clients/src/main/java/org/apache/kafka/clients/admin/Admin.java:
##########
@@ -1823,6 +1824,35 @@ default ListShareGroupsResult listShareGroups() {
return listShareGroups(new ListShareGroupsOptions());
}
+
+ /**
+ * An application metric provided for subscription.
+ * This metric will be added to this client's metrics
+ * that are available for subscription and sent as
+ * telemetry data to the broker.
+ * The provided metric must map to an OTLP metric data point
+ * type in the OpenTelemetry v1 metrics protobuf message types.
+ * Specifically, the metric should be one of the following:
+ * - `Sum`: Monotonic total count meter (Counter). Suitable for metrics
like total number of X, e.g., total bytes sent.
+ * - `Gauge`: Non-monotonic current value meter (UpDownCounter). Suitable
for metrics like current value of Y, e.g., current queue count.
+ * Metrics not matching these types are silently ignored.
+ * Executing this method for a previously registered metric is a benign
operation and results in updating that metrics entry.
+ *
+ * @param metric The application metric to register
+ */
+ void registerMetricForSubscription(KafkaMetric metric);
+
+ /**
+ * An application to be removed from subscription.
+ * This metric is removed from this client's metrics
+ * and will not be available for subscription any longer.
+ * Executing this method with a metric that has not been registered is a
+ * benign operation and does not result in any action taken (no-op)
Review Comment:
```suggestion
* benign operation and does not result in any action taken (no-op).
```
##########
streams/src/main/java/org/apache/kafka/streams/internals/metrics/StreamsClientMetricsDelegatingReporter.java:
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.internals.metrics;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricsReporter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+public class StreamsClientMetricsDelegatingReporter implements MetricsReporter
{
+
+ private static final Logger log =
LoggerFactory.getLogger(StreamsClientMetricsDelegatingReporter.class);
+ private final Admin adminClient;
+
+ public StreamsClientMetricsDelegatingReporter(final Admin adminClient) {
+ this.adminClient = Objects.requireNonNull(adminClient);
+ log.debug("Creating Client Metrics reporter for admin client {}",
adminClient);
+ }
+
+ @Override
+ public void init(final List<KafkaMetric> metrics) {
+ metrics.forEach(this::metricChange);
+ }
+
+ @Override
+ public void metricChange(final KafkaMetric metric) {
+ if (isStreamsClientMetric(metric)) {
+ log.debug("Registering metric {}", metric.metricName());
+ adminClient.registerMetricForSubscription(metric);
+ }
+ }
+
+ boolean isStreamsClientMetric(final KafkaMetric metric) {
Review Comment:
Should this be `private` ?
##########
streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java:
##########
@@ -0,0 +1,474 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.Measurable;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
+import org.apache.kafka.server.telemetry.ClientTelemetry;
+import org.apache.kafka.server.telemetry.ClientTelemetryPayload;
+import org.apache.kafka.server.telemetry.ClientTelemetryReceiver;
+import org.apache.kafka.streams.KafkaClientSupplier;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Properties;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkObjectProperties;
+import static
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.apache.kafka.test.TestUtils.waitForCondition;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@Timeout(600)
+@Tag("integration")
+public class KafkaStreamsTelemetryIntegrationTest {
+
+ private static EmbeddedKafkaCluster cluster;
+ private String appId;
+ private String inputTopicTwoPartitions;
+ private String outputTopicTwoPartitions;
+ private String inputTopicOnePartition;
+ private String outputTopicOnePartition;
+ private final List<Properties> streamsConfigurations = new ArrayList<>();
+ private static final List<MetricsInterceptingConsumer<byte[], byte[]>>
INTERCEPTING_CONSUMERS = new ArrayList<>();
+ private static final List<TestingMetricsInterceptingAdminClient>
INTERCEPTING_ADMIN_CLIENTS = new ArrayList<>();
+ private static final int NUM_BROKERS = 1;
+ private static final int FIRST_INSTANCE_CONSUMER = 0;
+ private static final int SECOND_INSTANCE_CONSUMER = 1;
+
+ @BeforeAll
+ public static void startCluster() throws IOException {
+ final Properties properties = new Properties();
+ properties.put("metric.reporters",
TestingClientTelemetry.class.getName());
+ cluster = new EmbeddedKafkaCluster(NUM_BROKERS, properties);
+ cluster.start();
+ }
+
+ @BeforeEach
+ public void setUp(final TestInfo testInfo) throws InterruptedException {
+ appId = safeUniqueTestName(testInfo);
+ inputTopicTwoPartitions = appId + "-input-two";
+ outputTopicTwoPartitions = appId + "-output-two";
+ inputTopicOnePartition = appId + "-input-one";
+ outputTopicOnePartition = appId + "-output-one";
+ cluster.createTopic(inputTopicTwoPartitions, 2, 1);
+ cluster.createTopic(outputTopicTwoPartitions, 2, 1);
+ cluster.createTopic(inputTopicOnePartition, 1, 1);
+ cluster.createTopic(outputTopicOnePartition, 1, 1);
+ }
+
+ @AfterAll
+ public static void closeCluster() {
+ cluster.stop();
+ }
+
+ @AfterEach
+ public void tearDown() throws Exception {
+ INTERCEPTING_CONSUMERS.clear();
+ INTERCEPTING_ADMIN_CLIENTS.clear();
+ IntegrationTestUtils.purgeLocalStreamsState(streamsConfigurations);
+ streamsConfigurations.clear();
+ }
+
+ @Test
+ @DisplayName("Calling unregisterMetric on metrics not registered should
not cause an error")
+ public void shouldNotThrowExceptionWhenRemovingNonExistingMetrics() throws
InterruptedException {
+ final Properties properties = props(true);
+ final Topology topology = complexTopology();
+ try (final KafkaStreams streams = new KafkaStreams(topology,
properties)) {
+ streams.start();
+ waitForCondition(() -> KafkaStreams.State.RUNNING ==
streams.state(),
+ IntegrationTestUtils.DEFAULT_TIMEOUT,
+ () -> "Kafka Streams never transitioned to a RUNNING
state.");
+
+ final Consumer<?, ?> embeddedConsumer =
INTERCEPTING_CONSUMERS.get(FIRST_INSTANCE_CONSUMER);
+ final MetricName metricName = new MetricName("fakeMetric",
"fakeGroup", "It's a fake metric", new HashMap<>());
+ final KafkaMetric nonExitingMetric = new KafkaMetric(new Object(),
metricName, (Measurable) (m, now) -> 1.0, new MetricConfig(), Time.SYSTEM);
+ assertDoesNotThrow(() ->
embeddedConsumer.unregisterMetricFromSubscription(nonExitingMetric));
+ }
+ }
+
+ @ParameterizedTest
+ @MethodSource("singleAndMultiTaskParameters")
+ @DisplayName("Streams metrics should get passed to Consumer")
Review Comment:
Similar to above? Should we add this to `StreamThreadTest` as unit-test?
##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -621,6 +631,9 @@ private KafkaAdminClient(AdminClientConfig config,
retryBackoffMaxMs,
CommonClientConfigs.RETRY_BACKOFF_JITTER);
this.clientTelemetryEnabled =
config.getBoolean(AdminClientConfig.ENABLE_METRICS_PUSH_CONFIG);
+ List<MetricsReporter> reporters =
CommonClientConfigs.metricsReporters(this.clientId, config);
+ this.clientTelemetryReporter = clientTelemetryReporter;
+ this.clientTelemetryReporter.ifPresent(reporters::add);
Review Comment:
Should reporters not be closed when admin client calls `metrics.close()` (I
would assume it does correctly close it's `metrics` object?)
Following up, because I don't see code changes about it yet.
##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -547,6 +551,8 @@ static KafkaAdminClient createInternal(
MetricsContext metricsContext = new KafkaMetricsContext(JMX_PREFIX,
config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX));
metrics = new Metrics(metricConfig, reporters, time,
metricsContext);
+ clientTelemetryReporter =
CommonClientConfigs.telemetryReporter(clientId, config);
+ clientTelemetryReporter.ifPresent(telemetryReporter ->
telemetryReporter.contextChange(metricsContext));
Review Comment:
> do we require them?
Yes, as we want to use `Admin` to report KS instance metrics.
> Secondly, does admin client has any metric of it's own in Kafka
I would assume so? Why would we create a `new Metrics` object if we don't
use it? (Even if I have to admit, the docs don't say anything about admin
metrics... 🤔 -- maybe @cmccabe can help on this question?)
> we might want to consider only enabling client telemetry reporter if
additional metrics are registered. Wdyt?
I guess we could do this -- but it's kinda odd to have the Admin config
"enable push metrics" but not enable it if set to `true`...? -- If we don't
want to enable it on the admin client, we should rather remove (or for now only
deprecate) the existing config, and add a new one via KIP-1076 which would
enable "application push metrics" which would only create a reporter if
application metrics are registered -- otherwise, we end up with some weird
behavior which can easily confuse users.
Btw: I was just looking into the consumer/producer code, and it seem there
we just create the reporter and add to `reporters` list before `new Metrics` is
called -- this way, `metricsContext` will be "added" automatically to the
reporter. Should simplify the code a little bit?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]