junrao commented on code in PR #15251:
URL: https://github.com/apache/kafka/pull/15251#discussion_r1470043472


##########
server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java:
##########
@@ -288,6 +307,9 @@ private ClientMetricsInstance 
createClientInstanceAndUpdateCache(Uuid clientInst
         ClientMetricsInstanceMetadata instanceMetadata) {
 
         ClientMetricsInstance clientInstance = 
createClientInstance(clientInstanceId, instanceMetadata);
+        // Maybe add client metrics, if metrics not already added. Metrics 
might be already added
+        // if the client instance was evicted from the cache because of size 
limit.

Review Comment:
   Hmm, if we evict a client instance from LRU cache, should we remove the 
corresponding metrics?



##########
server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java:
##########
@@ -493,4 +519,123 @@ public void run() {
             }
         }
     }
+
+    // Visible for testing
+    final class ClientMetricsStats {
+
+        private static final String GROUP_NAME = "ClientMetrics";
+
+        // Visible for testing
+        static final String INSTANCE_COUNT = "ClientMetricsInstanceCount";
+        static final String UNKNOWN_SUBSCRIPTION_REQUEST = 
"ClientMetricsUnknownSubscriptionRequest";
+        static final String THROTTLE = "ClientMetricsThrottle";
+        static final String PLUGIN_EXPORT = "ClientMetricsPluginExport";
+        static final String PLUGIN_ERROR = "ClientMetricsPluginError";
+        static final String PLUGIN_EXPORT_TIME = 
"ClientMetricsPluginExportTime";
+
+        // Names of sensors that are registered through client instances.
+        private final Set<String> sensorsName = ConcurrentHashMap.newKeySet();
+        // List of metric names which are not specific to a client instance. 
Do not require thread
+        // safe structure as it will be populated only in constructor.
+        private final List<MetricName> registeredMetricNames = new 
ArrayList<>();
+
+        private final Set<String> instanceMetrics = 
Stream.of(UNKNOWN_SUBSCRIPTION_REQUEST,
+            THROTTLE, PLUGIN_EXPORT, PLUGIN_ERROR, 
PLUGIN_EXPORT_TIME).collect(Collectors.toSet());
+
+        ClientMetricsStats() {
+            Measurable instanceCount = (config, now) -> 
clientInstanceCache.size();
+            MetricName instanceCountMetric = 
metrics.metricName(INSTANCE_COUNT, GROUP_NAME,
+                "The current number of client metric instances being managed 
by the broker");
+            metrics.addMetric(instanceCountMetric, instanceCount);
+            registeredMetricNames.add(instanceCountMetric);
+        }
+
+        public void maybeAddClientInstanceMetrics(Uuid clientInstanceId) {
+            // If one sensor of the metrics has been registered for the client 
instance,
+            // then all other sensors should have been registered; and vice 
versa.
+            if (metrics.getSensor(PLUGIN_EXPORT + "-" + clientInstanceId) != 
null) {
+                return;
+            }
+
+            Map<String, String> tags = 
Collections.singletonMap(ClientMetricsConfigs.CLIENT_INSTANCE_ID, 
clientInstanceId.toString());
+
+            Sensor unknownSubscriptionRequestCountSensor = metrics.sensor(
+                ClientMetricsStats.UNKNOWN_SUBSCRIPTION_REQUEST + "-" + 
clientInstanceId);
+            unknownSubscriptionRequestCountSensor.add(createMeter(metrics, new 
WindowedCount(),
+                ClientMetricsStats.UNKNOWN_SUBSCRIPTION_REQUEST, tags));
+            sensorsName.add(unknownSubscriptionRequestCountSensor.name());
+
+            Sensor throttleCount = metrics.sensor(ClientMetricsStats.THROTTLE 
+ "-" + clientInstanceId);
+            throttleCount.add(createMeter(metrics, new WindowedCount(), 
ClientMetricsStats.THROTTLE, tags));
+            sensorsName.add(throttleCount.name());
+
+            Sensor pluginExportCount = 
metrics.sensor(ClientMetricsStats.PLUGIN_EXPORT + "-" + clientInstanceId);
+            pluginExportCount.add(createMeter(metrics, new WindowedCount(), 
ClientMetricsStats.PLUGIN_EXPORT, tags));
+            sensorsName.add(pluginExportCount.name());
+
+            Sensor pluginErrorCount = 
metrics.sensor(ClientMetricsStats.PLUGIN_ERROR + "-" + clientInstanceId);
+            pluginErrorCount.add(createMeter(metrics, new WindowedCount(), 
ClientMetricsStats.PLUGIN_ERROR, tags));
+            sensorsName.add(pluginErrorCount.name());
+
+            Sensor pluginExportTime = 
metrics.sensor(ClientMetricsStats.PLUGIN_EXPORT_TIME + "-" + clientInstanceId);
+            
pluginExportTime.add(metrics.metricName(ClientMetricsStats.PLUGIN_EXPORT_TIME + 
"Avg",
+                ClientMetricsStats.GROUP_NAME, "Average time broker spent in 
invoking plugin exportMetrics call", tags), new Avg());
+            
pluginExportTime.add(metrics.metricName(ClientMetricsStats.PLUGIN_EXPORT_TIME + 
"Max",
+                ClientMetricsStats.GROUP_NAME, "Maximum time broker spent in 
invoking plugin exportMetrics call", tags), new Max());
+            sensorsName.add(pluginExportTime.name());
+        }
+
+        public void recordUnknownSubscriptionCount(Uuid clientInstanceId) {
+            record(UNKNOWN_SUBSCRIPTION_REQUEST, clientInstanceId);
+        }
+
+        public void recordThrottleCount(Uuid clientInstanceId) {
+            record(THROTTLE, clientInstanceId);
+        }
+
+        public void recordPluginExport(Uuid clientInstanceId, long timeMs) {
+            record(PLUGIN_EXPORT, clientInstanceId);
+            record(PLUGIN_EXPORT_TIME, clientInstanceId, timeMs);

Review Comment:
   > I might be wrong, but we have record method on sensor where PLUGIN_EXPORT 
sensor records a value of 1 on every invocation but PLUGIN_EXPORT_TIME needs to 
record time
   
   Hmm, but that's what WindowedCount does. It always records a value of 1 
independent of the passing-in value. The passing-in value can then be used to 
calculate avg and max.
   ```
   public class WindowedCount extends WindowedSum {
       @Override
       protected void update(Sample sample, MetricConfig config, double value, 
long now) {
           super.update(sample, config, 1.0, now);
       }
   }
   ```



##########
server/src/test/java/org/apache/kafka/server/ClientMetricsManagerTest.java:
##########
@@ -299,8 +332,8 @@ public void 
testGetTelemetrySameClientImmediateRetryAfterPushFail() throws Unkno
         // Create new client metrics manager which simulates a new server as 
it will not have any
         // last request information but request should succeed as subscription 
id should match
         // the one with new client instance.
-
-        ClientMetricsManager newClientMetricsManager = new 
ClientMetricsManager(clientMetricsReceiverPlugin, 100, time);
+        kafkaMetrics = new Metrics();

Review Comment:
   Then, should we close the new `kafkaMetrics` at the end of the test? Also, 
could we make the new `kafkaMetrics` a local val? Otherwise, the old one won't 
be closed.  Finally, should we close `newClientMetricsManager` at the end of 
the test?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to