junrao commented on code in PR #14699:
URL: https://github.com/apache/kafka/pull/14699#discussion_r1399714279


##########
core/src/main/java/kafka/server/ClientMetricsManager.java:
##########
@@ -16,31 +16,421 @@
  */
 package kafka.server;
 
+import kafka.metrics.ClientMetricsConfigs;
+import kafka.metrics.ClientMetricsInstance;
+import kafka.metrics.ClientMetricsInstanceMetadata;
+import kafka.metrics.ClientMetricsReceiverPlugin;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.cache.Cache;
+import org.apache.kafka.common.cache.LRUCache;
+import org.apache.kafka.common.cache.SynchronizedCache;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.TelemetryTooLargeException;
+import org.apache.kafka.common.errors.ThrottlingQuotaExceededException;
+import org.apache.kafka.common.errors.UnknownSubscriptionIdException;
+import org.apache.kafka.common.errors.UnsupportedCompressionTypeException;
+import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData;
+import org.apache.kafka.common.message.PushTelemetryResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse;
+import org.apache.kafka.common.requests.PushTelemetryRequest;
+import org.apache.kafka.common.requests.PushTelemetryResponse;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.utils.Crc32C;
+import org.apache.kafka.common.utils.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
 import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.regex.Pattern;
 
 /**
  * Handles client telemetry metrics requests/responses, subscriptions and 
instance information.
  */
 public class ClientMetricsManager implements Closeable {
 
     private static final Logger log = 
LoggerFactory.getLogger(ClientMetricsManager.class);
-    private static final ClientMetricsManager INSTANCE = new 
ClientMetricsManager();
+    private static final List<Byte> SUPPORTED_COMPRESSION_TYPES = 
Collections.unmodifiableList(
+        Arrays.asList(CompressionType.ZSTD.id, CompressionType.LZ4.id, 
CompressionType.GZIP.id, CompressionType.SNAPPY.id));
+    // Max cache size (16k active client connections per broker)
+    private static final int CM_CACHE_MAX_SIZE = 16384;
+
+    private final Cache<Uuid, ClientMetricsInstance> clientInstanceCache;
+    private final Map<String, SubscriptionInfo> subscriptionMap;
+    private final KafkaConfig config;
+    private final Time time;
+
+    // The latest subscription version is used to determine if subscription 
has changed and needs
+    // to re-evaluate the client instance subscription id as per changed 
subscriptions.
+    private final AtomicInteger subscriptionUpdateVersion;
 
-    public static ClientMetricsManager instance() {
-        return INSTANCE;
+    public ClientMetricsManager(KafkaConfig config, Time time) {
+        this.subscriptionMap = new ConcurrentHashMap<>();
+        this.subscriptionUpdateVersion = new AtomicInteger(0);
+        this.clientInstanceCache = new SynchronizedCache<>(new 
LRUCache<>(CM_CACHE_MAX_SIZE));
+        this.config = config;
+        this.time = time;
     }
 
     public void updateSubscription(String subscriptionName, Properties 
properties) {
-        // TODO: Implement the update logic to manage subscriptions.
+        // Validate the subscription properties.
+        ClientMetricsConfigs.validate(subscriptionName, properties);
+        // IncrementalAlterConfigs API will send empty configs when all the 
configs are deleted
+        // for respective subscription. In that case, we need to remove the 
subscription from the map.
+        if (properties.isEmpty()) {
+            // Remove the subscription from the map if it exists, else ignore 
the config update.
+            if (subscriptionMap.containsKey(subscriptionName)) {
+                log.info("Removing subscription [{}] from the subscription 
map", subscriptionName);
+                subscriptionMap.remove(subscriptionName);
+                this.subscriptionUpdateVersion.incrementAndGet();
+            }
+            return;
+        }
+
+        updateClientSubscription(subscriptionName, new 
ClientMetricsConfigs(properties));
+    }
+
+    public GetTelemetrySubscriptionsResponse 
processGetTelemetrySubscriptionRequest(
+        GetTelemetrySubscriptionsRequest request, RequestContext 
requestContext, int throttleMs) {
+
+        long now = time.milliseconds();
+        Uuid clientInstanceId = 
Optional.ofNullable(request.data().clientInstanceId())
+            .filter(id -> !id.equals(Uuid.ZERO_UUID))
+            .orElse(generateNewClientId());
+
+        /*
+         Get the client instance from the cache or create a new one. If 
subscription has changed
+         since the last request, then the client instance will be 
re-evaluated. Validation of the
+         request will be done after the client instance is created. If client 
issues another get
+         telemetry request prior to push interval, then the client should get 
a throttle error but if
+         the subscription has changed since the last request then the client 
should get the updated
+         subscription immediately.
+        */
+        ClientMetricsInstance clientInstance = 
clientInstance(clientInstanceId, requestContext);
+
+        try {
+            // Validate the get request parameters for the client instance.
+            validateGetRequest(request, clientInstance, now);
+        } catch (ApiException exception) {
+            return request.getErrorResponse(0, exception);
+        }
+
+        clientInstance.lastKnownError(Errors.NONE);
+        return createGetSubscriptionResponse(clientInstanceId, clientInstance, 
throttleMs);
+    }
+
+    public PushTelemetryResponse 
processPushTelemetryRequest(PushTelemetryRequest request,
+        RequestContext requestContext, int throttleMs) {
+
+        Uuid clientInstanceId = request.data().clientInstanceId();
+        if (clientInstanceId == null || 
Uuid.RESERVED.contains(clientInstanceId)) {
+            String msg = String.format("Invalid request from the client [%s], 
invalid client instance id",
+                clientInstanceId);
+            return request.getErrorResponse(0, new 
InvalidRequestException(msg));
+        }
+
+        long now = time.milliseconds();
+        ClientMetricsInstance clientInstance = 
clientInstance(clientInstanceId, requestContext);
+
+        try {
+            // Validate the push request parameters for the client instance.
+            validatePushRequest(request, clientInstance, now);
+        } catch (ApiException exception) {
+            log.debug("Error validating push telemetry request from client 
[{}]", clientInstanceId, exception);
+            clientInstance.lastKnownError(Errors.forException(exception));
+            return request.getErrorResponse(0, exception);
+        } finally {
+            // Update the client instance with the latest push request 
parameters.
+            clientInstance.terminating(request.data().terminating());
+        }
+
+        // Push the metrics to the external client receiver plugin.
+        byte[] metrics = request.data().metrics();
+        if (metrics != null && metrics.length > 0) {
+            try {
+                
ClientMetricsReceiverPlugin.instance().exportMetrics(requestContext, request);
+            } catch (Exception exception) {
+                clientInstance.lastKnownError(Errors.INVALID_RECORD);
+                return request.errorResponse(throttleMs, 
Errors.INVALID_RECORD);
+            }
+        }
+
+        clientInstance.lastKnownError(Errors.NONE);
+        return new PushTelemetryResponse(new 
PushTelemetryResponseData().setThrottleTimeMs(throttleMs));

Review Comment:
   Currently, we have a generic request quota (for CPU usage). It mutes the 
channel for `throttleMs` and sets no error code in the response.
   
   We also have additional quotas for specific requests. For example, 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-599 adds controller 
mutation rate quota. It mutes the channel for `throttleMs` and set 
THROTTLING_QUOTA_EXCEEDED  error in the response.
   
   For the new throttling introduced in KIP-714, I guess the intention is to do 
sth similar to controller mutation rate quota. If we want to do that, we need 
to do the following.
   1. Just setting `throttleMs` in the response won't mute the channel. We need 
to introduce a new type of `QuotaManager` in `QuotaManagers` and wire the 
handling of `pushTelemetry` request to the new `QuotaManager` so that it could 
use the generic framework for muting/unmuting the channel, updating the metric, 
etc.
   2. The existing throttling is rate based. It would be useful to map the new 
throttling in KIP-714 to that model so that we could get consistent throttling 
related metrics.
   3. `PushTelemetry` will also be subject to the request quota. So, we need a 
mechanism to combine the throttling time from two different quota managers. We 
can follow how `KafkaApis.handleProduceRequest` does that (since it needs to 
combine request quota and the produce quota).
   



##########
core/src/main/scala/kafka/server/metadata/DynamicConfigPublisher.scala:
##########
@@ -103,9 +103,16 @@ class DynamicConfigPublisher(
               )
             case CLIENT_METRICS =>
               // Apply changes to client metrics subscription.
-              info(s"Updating client metrics subscription ${resource.name()} 
with new configuration : " +
-                toLoggableProps(resource, props).mkString(","))
-              
dynamicConfigHandlers(ConfigType.ClientMetrics).processConfigChanges(resource.name(),
 props)
+              
dynamicConfigHandlers.get(ConfigType.ClientMetrics).foreach(metricsConfigHandler
 =>
+                try {
+                  info(s"Updating client metrics ${resource.name()} with new 
configuration : " +
+                    toLoggableProps(resource, props).mkString(","))

Review Comment:
   `toLoggableProps` by default treats the property as sensitive. Should we add 
the Client_Metrics type support in `KafkaConfig.loggableValue`?



##########
core/src/main/java/kafka/server/ClientMetricsManager.java:
##########
@@ -16,31 +16,420 @@
  */
 package kafka.server;
 
+import kafka.metrics.ClientMetricsConfigs;

Review Comment:
   With https://issues.apache.org/jira/browse/KAFKA-15853, we now have a new 
server module. So, we want to move this class and all classes in 
core/src/java/metrics there. This could be done in a follow up jira.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to