junrao commented on code in PR #15251: URL: https://github.com/apache/kafka/pull/15251#discussion_r1470043472
########## server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java: ########## @@ -288,6 +307,9 @@ private ClientMetricsInstance createClientInstanceAndUpdateCache(Uuid clientInst ClientMetricsInstanceMetadata instanceMetadata) { ClientMetricsInstance clientInstance = createClientInstance(clientInstanceId, instanceMetadata); + // Maybe add client metrics, if metrics not already added. Metrics might be already added + // if the client instance was evicted from the cache because of size limit. Review Comment: Hmm, if we evict a client instance from LRU cache, should we remove the corresponding metrics? ########## server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java: ########## @@ -493,4 +519,123 @@ public void run() { } } } + + // Visible for testing + final class ClientMetricsStats { + + private static final String GROUP_NAME = "ClientMetrics"; + + // Visible for testing + static final String INSTANCE_COUNT = "ClientMetricsInstanceCount"; + static final String UNKNOWN_SUBSCRIPTION_REQUEST = "ClientMetricsUnknownSubscriptionRequest"; + static final String THROTTLE = "ClientMetricsThrottle"; + static final String PLUGIN_EXPORT = "ClientMetricsPluginExport"; + static final String PLUGIN_ERROR = "ClientMetricsPluginError"; + static final String PLUGIN_EXPORT_TIME = "ClientMetricsPluginExportTime"; + + // Names of sensors that are registered through client instances. + private final Set<String> sensorsName = ConcurrentHashMap.newKeySet(); + // List of metric names which are not specific to a client instance. Do not require thread + // safe structure as it will be populated only in constructor. + private final List<MetricName> registeredMetricNames = new ArrayList<>(); + + private final Set<String> instanceMetrics = Stream.of(UNKNOWN_SUBSCRIPTION_REQUEST, + THROTTLE, PLUGIN_EXPORT, PLUGIN_ERROR, PLUGIN_EXPORT_TIME).collect(Collectors.toSet()); + + ClientMetricsStats() { + Measurable instanceCount = (config, now) -> clientInstanceCache.size(); + MetricName instanceCountMetric = metrics.metricName(INSTANCE_COUNT, GROUP_NAME, + "The current number of client metric instances being managed by the broker"); + metrics.addMetric(instanceCountMetric, instanceCount); + registeredMetricNames.add(instanceCountMetric); + } + + public void maybeAddClientInstanceMetrics(Uuid clientInstanceId) { + // If one sensor of the metrics has been registered for the client instance, + // then all other sensors should have been registered; and vice versa. + if (metrics.getSensor(PLUGIN_EXPORT + "-" + clientInstanceId) != null) { + return; + } + + Map<String, String> tags = Collections.singletonMap(ClientMetricsConfigs.CLIENT_INSTANCE_ID, clientInstanceId.toString()); + + Sensor unknownSubscriptionRequestCountSensor = metrics.sensor( + ClientMetricsStats.UNKNOWN_SUBSCRIPTION_REQUEST + "-" + clientInstanceId); + unknownSubscriptionRequestCountSensor.add(createMeter(metrics, new WindowedCount(), + ClientMetricsStats.UNKNOWN_SUBSCRIPTION_REQUEST, tags)); + sensorsName.add(unknownSubscriptionRequestCountSensor.name()); + + Sensor throttleCount = metrics.sensor(ClientMetricsStats.THROTTLE + "-" + clientInstanceId); + throttleCount.add(createMeter(metrics, new WindowedCount(), ClientMetricsStats.THROTTLE, tags)); + sensorsName.add(throttleCount.name()); + + Sensor pluginExportCount = metrics.sensor(ClientMetricsStats.PLUGIN_EXPORT + "-" + clientInstanceId); + pluginExportCount.add(createMeter(metrics, new WindowedCount(), ClientMetricsStats.PLUGIN_EXPORT, tags)); + sensorsName.add(pluginExportCount.name()); + + Sensor pluginErrorCount = metrics.sensor(ClientMetricsStats.PLUGIN_ERROR + "-" + clientInstanceId); + pluginErrorCount.add(createMeter(metrics, new WindowedCount(), ClientMetricsStats.PLUGIN_ERROR, tags)); + sensorsName.add(pluginErrorCount.name()); + + Sensor pluginExportTime = metrics.sensor(ClientMetricsStats.PLUGIN_EXPORT_TIME + "-" + clientInstanceId); + pluginExportTime.add(metrics.metricName(ClientMetricsStats.PLUGIN_EXPORT_TIME + "Avg", + ClientMetricsStats.GROUP_NAME, "Average time broker spent in invoking plugin exportMetrics call", tags), new Avg()); + pluginExportTime.add(metrics.metricName(ClientMetricsStats.PLUGIN_EXPORT_TIME + "Max", + ClientMetricsStats.GROUP_NAME, "Maximum time broker spent in invoking plugin exportMetrics call", tags), new Max()); + sensorsName.add(pluginExportTime.name()); + } + + public void recordUnknownSubscriptionCount(Uuid clientInstanceId) { + record(UNKNOWN_SUBSCRIPTION_REQUEST, clientInstanceId); + } + + public void recordThrottleCount(Uuid clientInstanceId) { + record(THROTTLE, clientInstanceId); + } + + public void recordPluginExport(Uuid clientInstanceId, long timeMs) { + record(PLUGIN_EXPORT, clientInstanceId); + record(PLUGIN_EXPORT_TIME, clientInstanceId, timeMs); Review Comment: > I might be wrong, but we have record method on sensor where PLUGIN_EXPORT sensor records a value of 1 on every invocation but PLUGIN_EXPORT_TIME needs to record time Hmm, but that's what WindowedCount does. It always records a value of 1 independent of the passing-in value. The passing-in value can then be used to calculate avg and max. ``` public class WindowedCount extends WindowedSum { @Override protected void update(Sample sample, MetricConfig config, double value, long now) { super.update(sample, config, 1.0, now); } } ``` ########## server/src/test/java/org/apache/kafka/server/ClientMetricsManagerTest.java: ########## @@ -299,8 +332,8 @@ public void testGetTelemetrySameClientImmediateRetryAfterPushFail() throws Unkno // Create new client metrics manager which simulates a new server as it will not have any // last request information but request should succeed as subscription id should match // the one with new client instance. - - ClientMetricsManager newClientMetricsManager = new ClientMetricsManager(clientMetricsReceiverPlugin, 100, time); + kafkaMetrics = new Metrics(); Review Comment: Then, should we close the new `kafkaMetrics` at the end of the test? Also, could we make the new `kafkaMetrics` a local val? Otherwise, the old one won't be closed. Finally, should we close `newClientMetricsManager` at the end of the test? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org