vvcephei commented on a change in pull request #9177:
URL: https://github.com/apache/kafka/pull/9177#discussion_r472250612



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
##########
@@ -415,9 +416,40 @@ public final Sensor storeLevelSensor(final String threadId,
         }
     }
 
-    public final void removeAllStoreLevelSensors(final String threadId,
-                                                 final String taskId,
-                                                 final String storeName) {
+    public <T> void addStoreLevelMutableMetric(final String threadId,
+                                               final String taskId,
+                                               final String metricsScope,
+                                               final String storeName,
+                                               final String name,
+                                               final String description,
+                                               final RecordingLevel 
recordingLevel,
+                                               final Gauge<T> valueProvider) {
+        final MetricName metricName = metrics.metricName(
+            name,
+            STATE_STORE_LEVEL_GROUP,
+            description,
+            storeLevelTagMap(threadId, taskId, metricsScope, storeName)
+        );
+        if (metrics.metric(metricName) == null) {
+            final MetricConfig metricConfig = new 
MetricConfig().recordLevel(recordingLevel);
+            final String key = storeSensorPrefix(threadId, taskId, storeName);
+            synchronized (storeLevelMetrics) {
+                metrics.addMetric(metricName, metricConfig, valueProvider);
+                storeLevelMetrics.computeIfAbsent(key, ignored -> new 
LinkedList<>()).push(metricName);
+            }
+        }
+    }
+
+    public final void removeAllStoreLevelSensorsAndMetrics(final String 
threadId,
+                                                           final String taskId,
+                                                           final String 
storeName) {
+        removeAllStoreLevelMetrics(threadId, taskId, storeName);
+        removeAllStoreLevelSensors(threadId, taskId, storeName);
+    }

Review comment:
       It just seems oddly granular to synchronize them individually, since we 
always remove all of both collections together. If it doesn't matter, then do 
we need to synchronize at all?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to