mjsax commented on code in PR #14724: URL: https://github.com/apache/kafka/pull/14724#discussion_r1408805454
########## clients/src/main/java/org/apache/kafka/clients/ClientTelemetryProvider.java: ########## @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients; + +import io.opentelemetry.proto.common.v1.AnyValue; +import io.opentelemetry.proto.common.v1.KeyValue; +import io.opentelemetry.proto.resource.v1.Resource; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.Configurable; +import org.apache.kafka.common.metrics.MetricsContext; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class ClientTelemetryProvider implements Configurable { + + public static final String DOMAIN = "org.apache.kafka"; + // Client metrics tags + public static final String CLIENT_RACK = "client_rack"; + public static final String GROUP_ID = "group_id"; + public static final String GROUP_INSTANCE_ID = "group_instance_id"; + public static final String GROUP_MEMBER_ID = "group_member_id"; + public static final String TRANSACTIONAL_ID = "transactional_id"; + + private static final String PRODUCER_NAMESPACE = "kafka.producer"; + private static final String CONSUMER_NAMESPACE = "kafka.consumer"; + + private static final Map<String, String> PRODUCER_CONFIG_MAPPING = new HashMap<>(); + private static final Map<String, String> CONSUMER_CONFIG_MAPPING = new HashMap<>(); + + private volatile Resource resource = null; + private Map<String, ?> config = null; + + static { + PRODUCER_CONFIG_MAPPING.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, ClientTelemetryProvider.TRANSACTIONAL_ID); + + CONSUMER_CONFIG_MAPPING.put(ConsumerConfig.GROUP_ID_CONFIG, ClientTelemetryProvider.GROUP_ID); + CONSUMER_CONFIG_MAPPING.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, ClientTelemetryProvider.GROUP_INSTANCE_ID); Review Comment: It seem we should add `group.member.id` there, too? ########## clients/src/main/java/org/apache/kafka/clients/ClientTelemetryReporter.java: ########## @@ -0,0 +1,937 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients; + +import io.opentelemetry.proto.metrics.v1.Metric; +import io.opentelemetry.proto.metrics.v1.MetricsData; +import io.opentelemetry.proto.metrics.v1.ResourceMetrics; +import io.opentelemetry.proto.metrics.v1.ScopeMetrics; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.InterruptException; +import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData; +import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData; +import org.apache.kafka.common.message.PushTelemetryRequestData; +import org.apache.kafka.common.message.PushTelemetryResponseData; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricsContext; +import org.apache.kafka.common.metrics.MetricsReporter; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.requests.AbstractRequest; +import org.apache.kafka.common.requests.AbstractRequest.Builder; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse; +import org.apache.kafka.common.requests.PushTelemetryRequest; +import org.apache.kafka.common.requests.PushTelemetryResponse; +import org.apache.kafka.common.telemetry.ClientTelemetryState; +import org.apache.kafka.common.telemetry.internals.ClientTelemetrySender; +import org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector; +import org.apache.kafka.common.telemetry.internals.MetricKeyable; +import org.apache.kafka.common.telemetry.internals.MetricsCollector; +import org.apache.kafka.common.telemetry.internals.MetricsEmitter; +import org.apache.kafka.common.telemetry.internals.SinglePointMetric; +import org.apache.kafka.common.telemetry.internals.TelemetryMetricNamingConvention; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.StringJoiner; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Predicate; + +/** + * The implementation of the {@link MetricsReporter} for client telemetry which manages the life-cycle + * of the client telemetry collection process. The client telemetry reporter is responsible for + * collecting the client telemetry data and sending it to the broker. + * <p> + * + * The client telemetry reporter is configured with a {@link ClientTelemetrySender} which is + * responsible for sending the client telemetry data to the broker. The client telemetry reporter + * will attempt to fetch the telemetry subscription information from the broker and send the + * telemetry data to the broker based on the subscription information. + * <p> + * + * The full life-cycle of the metric collection process is defined by a state machine in + * {@link ClientTelemetryState}. Each state is associated with a different set of operations. + * For example, the client telemetry reporter will attempt to fetch the telemetry subscription + * from the broker when in the {@link ClientTelemetryState#SUBSCRIPTION_NEEDED} state. + * If the push operation fails, the client telemetry reporter will attempt to re-fetch the + * subscription information by setting the state back to {@link ClientTelemetryState#SUBSCRIPTION_NEEDED}. + * <p> + * + * In an unlikely scenario, if a bad state transition is detected, an + * {@link IllegalStateException} will be thrown. + * <p> + * + * The state transition follows the following steps in order: + * <ol> + * <li>{@link ClientTelemetryState#SUBSCRIPTION_NEEDED}</li> + * <li>{@link ClientTelemetryState#SUBSCRIPTION_IN_PROGRESS}</li> + * <li>{@link ClientTelemetryState#PUSH_NEEDED}</li> + * <li>{@link ClientTelemetryState#PUSH_IN_PROGRESS}</li> + * <li>{@link ClientTelemetryState#TERMINATING_PUSH_NEEDED}</li> + * <li>{@link ClientTelemetryState#TERMINATING_PUSH_IN_PROGRESS}</li> + * <li>{@link ClientTelemetryState#TERMINATED}</li> + * </ol> + * <p> + * + * For more detail in state transition, see {@link ClientTelemetryState#validateTransition}. + */ +public class ClientTelemetryReporter implements MetricsReporter { + + private static final Logger log = LoggerFactory.getLogger(ClientTelemetryReporter.class); + + public static final int DEFAULT_PUSH_INTERVAL_MS = 5 * 60 * 1000; + + private final List<MetricsCollector> collectors; + private final ClientTelemetryProvider telemetryProvider; + + private final ClientTelemetrySender clientTelemetrySender; + private final Time time; + private Map<String, Object> rawOriginalConfig; + private KafkaMetricsCollector kafkaMetricsCollector; + + public ClientTelemetryReporter(Time time) { + this.time = time; + collectors = new CopyOnWriteArrayList<>(); + telemetryProvider = new ClientTelemetryProvider(); + clientTelemetrySender = new DefaultClientTelemetrySender(); + } + + @SuppressWarnings("unchecked") + @Override + public synchronized void configure(Map<String, ?> configs) { + rawOriginalConfig = (Map<String, Object>) Objects.requireNonNull(configs); + } + + @Override + public synchronized void contextChange(MetricsContext metricsContext) { + /* + If validation succeeds: initialize the provider, start the metric collection task, + set metrics labels for services/libraries that expose metrics. + */ + Objects.requireNonNull(rawOriginalConfig, "configure() was not called before contextChange()"); + collectors.forEach(MetricsCollector::stop); + + if (!telemetryProvider.validate(metricsContext)) { + log.warn("Validation failed for {} context {}, skip starting collectors. Metrics collection is disabled", + telemetryProvider.getClass(), metricsContext.contextLabels()); + return; + } + + if (kafkaMetricsCollector == null) { + // Initialize the provider only once. contextChange(..) can be called more than once, + // but once it's been initialized and all necessary labels are present then we don't + // re-initialize. + telemetryProvider.configure(rawOriginalConfig); + } + + telemetryProvider.contextChange(metricsContext); + + if (kafkaMetricsCollector == null) { + initCollectors(); + } + } + + @Override + public void init(List<KafkaMetric> metrics) { + // metrics collector may not have been initialized (e.g. invalid context labels) + // in which case metrics collection is disabled + if (kafkaMetricsCollector != null) { + kafkaMetricsCollector.init(metrics); + } + } + + /** + * Method is invoked whenever a metric is added/registered + */ + @Override + public void metricChange(KafkaMetric metric) { + // metrics collector may not have been initialized (e.g. invalid context labels) + // in which case metrics collection is disabled + if (kafkaMetricsCollector != null) { + kafkaMetricsCollector.metricChange(metric); + } + } + + /** + * Method is invoked whenever a metric is removed + */ + @Override + public void metricRemoval(KafkaMetric metric) { + // metrics collector may not have been initialized (e.g. invalid context labels) + // in which case metrics collection is disabled + if (kafkaMetricsCollector != null) { + kafkaMetricsCollector.metricRemoval(metric); + } + } + + @Override + public void close() { + log.debug("Stopping ClientTelemetryReporter"); + try { + clientTelemetrySender.close(); + } catch (Exception exception) { + log.error("Failed to close client telemetry reporter", exception); + } + } + + public synchronized void updateMetricsLabels(Map<String, String> labels) { Review Comment: I cannot find where this is called, and thus it's not clear to me why it's added (similar for `telemetryProvider.updateLabels(...)` which is only called from here as it seems) ########## clients/src/main/java/org/apache/kafka/clients/ClientTelemetryProvider.java: ########## @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients; + +import io.opentelemetry.proto.common.v1.AnyValue; +import io.opentelemetry.proto.common.v1.KeyValue; +import io.opentelemetry.proto.resource.v1.Resource; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.internals.ConsumerUtils; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.Configurable; +import org.apache.kafka.common.metrics.MetricsContext; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class ClientTelemetryProvider implements Configurable { + + public static final String DOMAIN = "org.apache.kafka"; + // Client metrics tags + public static final String CLIENT_RACK = "client_rack"; + public static final String GROUP_ID = "group_id"; + public static final String GROUP_INSTANCE_ID = "group_instance_id"; + public static final String TRANSACTIONAL_ID = "transactional_id"; + + private static final Map<String, String> PRODUCER_CONFIG_MAPPING = new HashMap<>(); + private static final Map<String, String> CONSUMER_CONFIG_MAPPING = new HashMap<>(); + + private Resource resource = null; + private Map<String, ?> config = null; + + static { + PRODUCER_CONFIG_MAPPING.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, ClientTelemetryProvider.TRANSACTIONAL_ID); + + CONSUMER_CONFIG_MAPPING.put(ConsumerConfig.GROUP_ID_CONFIG, ClientTelemetryProvider.GROUP_ID); + CONSUMER_CONFIG_MAPPING.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, ClientTelemetryProvider.GROUP_INSTANCE_ID); + } + + @Override + public synchronized void configure(Map<String, ?> configs) { + this.config = configs; + } + + /** + * Validate that all the data required for generating correct metrics is present. The provider + * will be disabled if validation fails. + * + * @param metricsContext {@link MetricsContext} + * @return false if all the data required for generating correct metrics is missing, true + * otherwise. + */ + public boolean validate(MetricsContext metricsContext, Map<String, ?> config) { + // metric collection will be disabled for clients without a client id (e.g. transient admin clients) + return ClientTelemetryUtils.validateResourceLabel(config, CommonClientConfigs.CLIENT_ID_CONFIG) && + ClientTelemetryUtils.validateRequiredResourceLabels(metricsContext.contextLabels()); + } + + /** + * Sets the metrics tags for the service or library exposing metrics. This will be called before + * {@link org.apache.kafka.common.metrics.MetricsReporter#init(List)} and may be called anytime + * after that. + * + * @param metricsContext {@link MetricsContext} + */ + public void contextChange(MetricsContext metricsContext) { + final Resource.Builder resourceBuilder = Resource.newBuilder(); + + final String namespace = metricsContext.contextLabels().get(MetricsContext.NAMESPACE); + if (KafkaProducer.JMX_PREFIX.equals(namespace)) { + // Add producer resource labels. + PRODUCER_CONFIG_MAPPING.forEach((configKey, telemetryKey) -> { + if (config.containsKey(configKey)) { + addAttribute(resourceBuilder, telemetryKey, String.valueOf(config.get(configKey))); + } + }); + } else if (ConsumerUtils.CONSUMER_JMX_PREFIX.equals(namespace)) { + // Add consumer resource labels. + CONSUMER_CONFIG_MAPPING.forEach((configKey, telemetryKey) -> { + if (config.containsKey(configKey)) { + addAttribute(resourceBuilder, telemetryKey, String.valueOf(config.get(configKey))); + } + }); + } + + // Add client rack label. + if (config.containsKey(CommonClientConfigs.CLIENT_RACK_CONFIG)) { Review Comment: Ah. I think I read the code incorrectly. Thanks. All good. ########## clients/src/main/java/org/apache/kafka/clients/ClientTelemetryProvider.java: ########## @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients; + +import io.opentelemetry.proto.common.v1.AnyValue; +import io.opentelemetry.proto.common.v1.KeyValue; +import io.opentelemetry.proto.resource.v1.Resource; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.internals.ConsumerUtils; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.Configurable; +import org.apache.kafka.common.metrics.MetricsContext; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class ClientTelemetryProvider implements Configurable { + + public static final String DOMAIN = "org.apache.kafka"; + // Client metrics tags + public static final String CLIENT_RACK = "client_rack"; + public static final String GROUP_ID = "group_id"; + public static final String GROUP_INSTANCE_ID = "group_instance_id"; + public static final String TRANSACTIONAL_ID = "transactional_id"; Review Comment: The `if any` is mentioned on the KIP for others, too, that's why I found it confusing. -- Thanks for adding. I ams not sure so, why more changes need to go into a follow up PR? Can you elaborate? ########## clients/src/main/java/org/apache/kafka/clients/ClientTelemetryReporter.java: ########## @@ -550,6 +576,41 @@ public void close() { } } + @Override + public void initiateClose(long timeoutMs) { + log.debug("initiate close for client telemetry, check if terminal push required. Timeout {} ms.", timeoutMs); + + lock.writeLock().lock(); + try { + // If we never fetched a subscription, we can't really push anything. + if (lastRequestMs == 0) { + log.info("Telemetry subscription not loaded, not attempting terminating push"); Review Comment: This is `info` while others below are `debug`. Why? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
