junrao commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1399714279
########## core/src/main/java/kafka/server/ClientMetricsManager.java: ########## @@ -16,31 +16,421 @@ */ package kafka.server; +import kafka.metrics.ClientMetricsConfigs; +import kafka.metrics.ClientMetricsInstance; +import kafka.metrics.ClientMetricsInstanceMetadata; +import kafka.metrics.ClientMetricsReceiverPlugin; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.cache.Cache; +import org.apache.kafka.common.cache.LRUCache; +import org.apache.kafka.common.cache.SynchronizedCache; +import org.apache.kafka.common.errors.ApiException; +import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.errors.TelemetryTooLargeException; +import org.apache.kafka.common.errors.ThrottlingQuotaExceededException; +import org.apache.kafka.common.errors.UnknownSubscriptionIdException; +import org.apache.kafka.common.errors.UnsupportedCompressionTypeException; +import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData; +import org.apache.kafka.common.message.PushTelemetryResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest; +import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse; +import org.apache.kafka.common.requests.PushTelemetryRequest; +import org.apache.kafka.common.requests.PushTelemetryResponse; +import org.apache.kafka.common.requests.RequestContext; +import org.apache.kafka.common.utils.Crc32C; +import org.apache.kafka.common.utils.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.Closeable; import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.regex.Pattern; /** * Handles client telemetry metrics requests/responses, subscriptions and instance information. */ public class ClientMetricsManager implements Closeable { private static final Logger log = LoggerFactory.getLogger(ClientMetricsManager.class); - private static final ClientMetricsManager INSTANCE = new ClientMetricsManager(); + private static final List<Byte> SUPPORTED_COMPRESSION_TYPES = Collections.unmodifiableList( + Arrays.asList(CompressionType.ZSTD.id, CompressionType.LZ4.id, CompressionType.GZIP.id, CompressionType.SNAPPY.id)); + // Max cache size (16k active client connections per broker) + private static final int CM_CACHE_MAX_SIZE = 16384; + + private final Cache<Uuid, ClientMetricsInstance> clientInstanceCache; + private final Map<String, SubscriptionInfo> subscriptionMap; + private final KafkaConfig config; + private final Time time; + + // The latest subscription version is used to determine if subscription has changed and needs + // to re-evaluate the client instance subscription id as per changed subscriptions. + private final AtomicInteger subscriptionUpdateVersion; - public static ClientMetricsManager instance() { - return INSTANCE; + public ClientMetricsManager(KafkaConfig config, Time time) { + this.subscriptionMap = new ConcurrentHashMap<>(); + this.subscriptionUpdateVersion = new AtomicInteger(0); + this.clientInstanceCache = new SynchronizedCache<>(new LRUCache<>(CM_CACHE_MAX_SIZE)); + this.config = config; + this.time = time; } public void updateSubscription(String subscriptionName, Properties properties) { - // TODO: Implement the update logic to manage subscriptions. + // Validate the subscription properties. + ClientMetricsConfigs.validate(subscriptionName, properties); + // IncrementalAlterConfigs API will send empty configs when all the configs are deleted + // for respective subscription. In that case, we need to remove the subscription from the map. + if (properties.isEmpty()) { + // Remove the subscription from the map if it exists, else ignore the config update. + if (subscriptionMap.containsKey(subscriptionName)) { + log.info("Removing subscription [{}] from the subscription map", subscriptionName); + subscriptionMap.remove(subscriptionName); + this.subscriptionUpdateVersion.incrementAndGet(); + } + return; + } + + updateClientSubscription(subscriptionName, new ClientMetricsConfigs(properties)); + } + + public GetTelemetrySubscriptionsResponse processGetTelemetrySubscriptionRequest( + GetTelemetrySubscriptionsRequest request, RequestContext requestContext, int throttleMs) { + + long now = time.milliseconds(); + Uuid clientInstanceId = Optional.ofNullable(request.data().clientInstanceId()) + .filter(id -> !id.equals(Uuid.ZERO_UUID)) + .orElse(generateNewClientId()); + + /* + Get the client instance from the cache or create a new one. If subscription has changed + since the last request, then the client instance will be re-evaluated. Validation of the + request will be done after the client instance is created. If client issues another get + telemetry request prior to push interval, then the client should get a throttle error but if + the subscription has changed since the last request then the client should get the updated + subscription immediately. + */ + ClientMetricsInstance clientInstance = clientInstance(clientInstanceId, requestContext); + + try { + // Validate the get request parameters for the client instance. + validateGetRequest(request, clientInstance, now); + } catch (ApiException exception) { + return request.getErrorResponse(0, exception); + } + + clientInstance.lastKnownError(Errors.NONE); + return createGetSubscriptionResponse(clientInstanceId, clientInstance, throttleMs); + } + + public PushTelemetryResponse processPushTelemetryRequest(PushTelemetryRequest request, + RequestContext requestContext, int throttleMs) { + + Uuid clientInstanceId = request.data().clientInstanceId(); + if (clientInstanceId == null || Uuid.RESERVED.contains(clientInstanceId)) { + String msg = String.format("Invalid request from the client [%s], invalid client instance id", + clientInstanceId); + return request.getErrorResponse(0, new InvalidRequestException(msg)); + } + + long now = time.milliseconds(); + ClientMetricsInstance clientInstance = clientInstance(clientInstanceId, requestContext); + + try { + // Validate the push request parameters for the client instance. + validatePushRequest(request, clientInstance, now); + } catch (ApiException exception) { + log.debug("Error validating push telemetry request from client [{}]", clientInstanceId, exception); + clientInstance.lastKnownError(Errors.forException(exception)); + return request.getErrorResponse(0, exception); + } finally { + // Update the client instance with the latest push request parameters. + clientInstance.terminating(request.data().terminating()); + } + + // Push the metrics to the external client receiver plugin. + byte[] metrics = request.data().metrics(); + if (metrics != null && metrics.length > 0) { + try { + ClientMetricsReceiverPlugin.instance().exportMetrics(requestContext, request); + } catch (Exception exception) { + clientInstance.lastKnownError(Errors.INVALID_RECORD); + return request.errorResponse(throttleMs, Errors.INVALID_RECORD); + } + } + + clientInstance.lastKnownError(Errors.NONE); + return new PushTelemetryResponse(new PushTelemetryResponseData().setThrottleTimeMs(throttleMs)); Review Comment: Currently, we have a generic request quota (for CPU usage). It mutes the channel for `throttleMs` and sets no error code in the response. We also have additional quotas for specific requests. For example, https://cwiki.apache.org/confluence/display/KAFKA/KIP-599 adds controller mutation rate quota. It mutes the channel for `throttleMs` and set THROTTLING_QUOTA_EXCEEDED error in the response. For the new throttling introduced in KIP-714, I guess the intention is to do sth similar to controller mutation rate quota. If we want to do that, we need to do the following. 1. Just setting `throttleMs` in the response won't mute the channel. We need to introduce a new type of `QuotaManager` in `QuotaManagers` and wire the handling of `pushTelemetry` request to the new `QuotaManager` so that it could use the generic framework for muting/unmuting the channel, updating the metric, etc. 2. The existing throttling is rate based. It would be useful to map the new throttling in KIP-714 to that model so that we could get consistent throttling related metrics. 3. `PushTelemetry` will also be subject to the request quota. So, we need a mechanism to combine the throttling time from two different quota managers. We can follow how `KafkaApis.handleProduceRequest` does that (since it needs to combine request quota and the produce quota). ########## core/src/main/scala/kafka/server/metadata/DynamicConfigPublisher.scala: ########## @@ -103,9 +103,16 @@ class DynamicConfigPublisher( ) case CLIENT_METRICS => // Apply changes to client metrics subscription. - info(s"Updating client metrics subscription ${resource.name()} with new configuration : " + - toLoggableProps(resource, props).mkString(",")) - dynamicConfigHandlers(ConfigType.ClientMetrics).processConfigChanges(resource.name(), props) + dynamicConfigHandlers.get(ConfigType.ClientMetrics).foreach(metricsConfigHandler => + try { + info(s"Updating client metrics ${resource.name()} with new configuration : " + + toLoggableProps(resource, props).mkString(",")) Review Comment: `toLoggableProps` by default treats the property as sensitive. Should we add the Client_Metrics type support in `KafkaConfig.loggableValue`? ########## core/src/main/java/kafka/server/ClientMetricsManager.java: ########## @@ -16,31 +16,420 @@ */ package kafka.server; +import kafka.metrics.ClientMetricsConfigs; Review Comment: With https://issues.apache.org/jira/browse/KAFKA-15853, we now have a new server module. So, we want to move this class and all classes in core/src/java/metrics there. This could be done in a follow up jira. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org