vvcephei commented on a change in pull request #9177:
URL: https://github.com/apache/kafka/pull/9177#discussion_r471582580



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
##########
@@ -415,9 +416,40 @@ public final Sensor storeLevelSensor(final String threadId,
         }
     }
 
-    public final void removeAllStoreLevelSensors(final String threadId,
-                                                 final String taskId,
-                                                 final String storeName) {
+    public <T> void addStoreLevelMutableMetric(final String threadId,
+                                               final String taskId,
+                                               final String metricsScope,
+                                               final String storeName,
+                                               final String name,
+                                               final String description,
+                                               final RecordingLevel 
recordingLevel,
+                                               final Gauge<T> valueProvider) {
+        final MetricName metricName = metrics.metricName(
+            name,
+            STATE_STORE_LEVEL_GROUP,
+            description,
+            storeLevelTagMap(threadId, taskId, metricsScope, storeName)
+        );
+        if (metrics.metric(metricName) == null) {
+            final MetricConfig metricConfig = new 
MetricConfig().recordLevel(recordingLevel);
+            final String key = storeSensorPrefix(threadId, taskId, storeName);
+            synchronized (storeLevelMetrics) {

Review comment:
       Should we check `if (metrics.metric(metricName) == null)` again after 
synchronizing?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
##########
@@ -415,9 +416,40 @@ public final Sensor storeLevelSensor(final String threadId,
         }
     }
 
-    public final void removeAllStoreLevelSensors(final String threadId,
-                                                 final String taskId,
-                                                 final String storeName) {
+    public <T> void addStoreLevelMutableMetric(final String threadId,
+                                               final String taskId,
+                                               final String metricsScope,
+                                               final String storeName,
+                                               final String name,
+                                               final String description,
+                                               final RecordingLevel 
recordingLevel,
+                                               final Gauge<T> valueProvider) {
+        final MetricName metricName = metrics.metricName(
+            name,
+            STATE_STORE_LEVEL_GROUP,
+            description,
+            storeLevelTagMap(threadId, taskId, metricsScope, storeName)
+        );
+        if (metrics.metric(metricName) == null) {
+            final MetricConfig metricConfig = new 
MetricConfig().recordLevel(recordingLevel);
+            final String key = storeSensorPrefix(threadId, taskId, storeName);
+            synchronized (storeLevelMetrics) {
+                metrics.addMetric(metricName, metricConfig, valueProvider);
+                storeLevelMetrics.computeIfAbsent(key, ignored -> new 
LinkedList<>()).push(metricName);
+            }
+        }
+    }
+
+    public final void removeAllStoreLevelSensorsAndMetrics(final String 
threadId,
+                                                           final String taskId,
+                                                           final String 
storeName) {
+        removeAllStoreLevelMetrics(threadId, taskId, storeName);
+        removeAllStoreLevelSensors(threadId, taskId, storeName);
+    }

Review comment:
       Should we make this all one method, and also synchronize both storeLevel 
collections on a single monitor?

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
##########
@@ -609,6 +611,37 @@ public void 
shouldVerifyThatMetricsGetMeasurementsFromRocksDB() {
         assertThat((double) bytesWrittenTotal.metricValue(), greaterThan(0d));
     }
 
+    @Test
+    public void 
shouldVerifyThatMetricsRecordedFromPropertiesGetMeasurementsFromRocksDB() {
+        final TaskId taskId = new TaskId(0, 0);
+
+        final Metrics metrics = new Metrics(new 
MetricConfig().recordLevel(RecordingLevel.INFO));
+        final StreamsMetricsImpl streamsMetrics =
+            new StreamsMetricsImpl(metrics, "test-application", 
StreamsConfig.METRICS_LATEST, time);
+
+        context = EasyMock.niceMock(InternalMockProcessorContext.class);
+        EasyMock.expect(context.metrics()).andStubReturn(streamsMetrics);
+        EasyMock.expect(context.taskId()).andStubReturn(taskId);
+        EasyMock.expect(context.appConfigs())
+                .andStubReturn(new 
StreamsConfig(StreamsTestUtils.getStreamsConfig()).originals());
+        EasyMock.expect(context.stateDir()).andStubReturn(dir);
+        EasyMock.replay(context);
+
+        rocksDBStore.init(context, rocksDBStore);
+        final byte[] key = "hello".getBytes();
+        final byte[] value = "world".getBytes();
+        rocksDBStore.put(Bytes.wrap(key), value);
+
+        final Metric numberOfEntriesActiveMemTable = metrics.metric(new 
MetricName(
+            "num-entries-active-mem-table",
+            StreamsMetricsImpl.STATE_STORE_LEVEL_GROUP,
+            "description is not verified",
+            streamsMetrics.storeLevelTagMap(Thread.currentThread().getName(), 
taskId.toString(), METRICS_SCOPE, DB_NAME)
+        ));
+        assertThat(numberOfEntriesActiveMemTable, notNullValue());
+        assertThat((BigInteger) numberOfEntriesActiveMemTable.metricValue(), 
greaterThan(BigInteger.valueOf(0)));

Review comment:
       would it not be exactly `1`?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java
##########
@@ -56,6 +62,9 @@ public void maybeCloseStatistics() {
         }
     }
 
+    private static final String ROCKSDB_PROPERTIES_PREFIX = "rocksdb.";
+    private static final ByteBuffer CONVERSION_BUFFER = 
ByteBuffer.allocate(Long.BYTES);

Review comment:
       It seems a little risky to use this in a multithreaded context. Why not 
just create a new short-lived buffer each time for the conversion?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to