guozhangwang commented on a change in pull request #9177:
URL: https://github.com/apache/kafka/pull/9177#discussion_r472429141



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetrics.java
##########
@@ -33,7 +35,7 @@
 import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addSumMetricToSensor;
 import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addValueMetricToSensor;
 
-public class RocksDBMetrics {
+public class  RocksDBMetrics {

Review comment:
       nit: extra space.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
##########
@@ -415,9 +416,40 @@ public final Sensor storeLevelSensor(final String threadId,
         }
     }
 
-    public final void removeAllStoreLevelSensors(final String threadId,
-                                                 final String taskId,
-                                                 final String storeName) {
+    public <T> void addStoreLevelMutableMetric(final String threadId,
+                                               final String taskId,
+                                               final String metricsScope,
+                                               final String storeName,
+                                               final String name,
+                                               final String description,
+                                               final RecordingLevel 
recordingLevel,
+                                               final Gauge<T> valueProvider) {
+        final MetricName metricName = metrics.metricName(
+            name,
+            STATE_STORE_LEVEL_GROUP,
+            description,
+            storeLevelTagMap(threadId, taskId, metricsScope, storeName)
+        );
+        if (metrics.metric(metricName) == null) {
+            final MetricConfig metricConfig = new 
MetricConfig().recordLevel(recordingLevel);
+            final String key = storeSensorPrefix(threadId, taskId, storeName);
+            synchronized (storeLevelMetrics) {
+                metrics.addMetric(metricName, metricConfig, valueProvider);
+                storeLevelMetrics.computeIfAbsent(key, ignored -> new 
LinkedList<>()).push(metricName);
+            }
+        }
+    }
+
+    public final void removeAllStoreLevelSensorsAndMetrics(final String 
threadId,
+                                                           final String taskId,
+                                                           final String 
storeName) {
+        removeAllStoreLevelMetrics(threadId, taskId, storeName);
+        removeAllStoreLevelSensors(threadId, taskId, storeName);
+    }

Review comment:
       This may be a bit paranoid, but when adding them, the order seem to be 
`initSensors` first and `initGauges`, while removing we call 
`removeAllStoreLevelMetrics` first and then the other. I know that today there 
should be not concurrent threads trying to init / removeAll concurrently, but 
just to be safe maybe we can make the call ordering to be sensors first and 
then gauges?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to