guozhangwang commented on a change in pull request #9177: URL: https://github.com/apache/kafka/pull/9177#discussion_r472429141
########## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetrics.java ########## @@ -33,7 +35,7 @@ import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addSumMetricToSensor; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addValueMetricToSensor; -public class RocksDBMetrics { +public class RocksDBMetrics { Review comment: nit: extra space. ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java ########## @@ -415,9 +416,40 @@ public final Sensor storeLevelSensor(final String threadId, } } - public final void removeAllStoreLevelSensors(final String threadId, - final String taskId, - final String storeName) { + public <T> void addStoreLevelMutableMetric(final String threadId, + final String taskId, + final String metricsScope, + final String storeName, + final String name, + final String description, + final RecordingLevel recordingLevel, + final Gauge<T> valueProvider) { + final MetricName metricName = metrics.metricName( + name, + STATE_STORE_LEVEL_GROUP, + description, + storeLevelTagMap(threadId, taskId, metricsScope, storeName) + ); + if (metrics.metric(metricName) == null) { + final MetricConfig metricConfig = new MetricConfig().recordLevel(recordingLevel); + final String key = storeSensorPrefix(threadId, taskId, storeName); + synchronized (storeLevelMetrics) { + metrics.addMetric(metricName, metricConfig, valueProvider); + storeLevelMetrics.computeIfAbsent(key, ignored -> new LinkedList<>()).push(metricName); + } + } + } + + public final void removeAllStoreLevelSensorsAndMetrics(final String threadId, + final String taskId, + final String storeName) { + removeAllStoreLevelMetrics(threadId, taskId, storeName); + removeAllStoreLevelSensors(threadId, taskId, storeName); + } Review comment: This may be a bit paranoid, but when adding them, the order seem to be `initSensors` first and `initGauges`, while removing we call `removeAllStoreLevelMetrics` first and then the other. I know that today there should be not concurrent threads trying to init / removeAll concurrently, but just to be safe maybe we can make the call ordering to be sensors first and then gauges? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org