guozhangwang commented on a change in pull request #9232:
URL: https://github.com/apache/kafka/pull/9232#discussion_r480477639



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java
##########
@@ -150,14 +176,55 @@ private void verifyStatistics(final String segmentName, 
final Statistics statist
                 statistics != null &&
                 
storeToValueProviders.values().stream().anyMatch(valueProviders -> 
valueProviders.statistics == null))) {
 
-            throw new IllegalStateException("Statistics for store \"" + 
segmentName + "\" of task " + taskId +
-                " is" + (statistics == null ? " " : " not ") + "null although 
the statistics of another store in this " +
+            throw new IllegalStateException("Statistics for segment " + 
segmentName + " of task " + taskId +
+                " is" + (statistics == null ? " " : " not ") + "null although 
the statistics of another segment in this " +
                 "metrics recorder is" + (statistics != null ? " " : " not ") + 
"null. " +
                 "This is a bug in Kafka Streams. " +
                 "Please open a bug report under 
https://issues.apache.org/jira/projects/KAFKA/issues";);
         }
     }
 
+    private void verifyDbAndCacheAndStatistics(final String segmentName,
+                                               final RocksDB db,
+                                               final Cache cache,
+                                               final Statistics statistics) {
+        for (final DbAndCacheAndStatistics valueProviders : 
storeToValueProviders.values()) {
+            verifyIfSomeAreNull(segmentName, statistics, 
valueProviders.statistics, "statistics");
+            verifyIfSomeAreNull(segmentName, cache, valueProviders.cache, 
"cache");
+            if (db == valueProviders.db) {
+                throw new IllegalStateException("DB instance for store " + 
segmentName + " of task " + taskId +
+                    " was already added for another segment as a value 
provider. This is a bug in Kafka Streams. " +
+                    "Please open a bug report under 
https://issues.apache.org/jira/projects/KAFKA/issues";);
+            }
+            if (storeToValueProviders.size() == 1 && cache != 
valueProviders.cache) {

Review comment:
       Hmm, why we need the second condition to determine `singleCache = false` 
here?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java
##########
@@ -150,14 +176,55 @@ private void verifyStatistics(final String segmentName, 
final Statistics statist
                 statistics != null &&
                 
storeToValueProviders.values().stream().anyMatch(valueProviders -> 
valueProviders.statistics == null))) {
 
-            throw new IllegalStateException("Statistics for store \"" + 
segmentName + "\" of task " + taskId +
-                " is" + (statistics == null ? " " : " not ") + "null although 
the statistics of another store in this " +
+            throw new IllegalStateException("Statistics for segment " + 
segmentName + " of task " + taskId +
+                " is" + (statistics == null ? " " : " not ") + "null although 
the statistics of another segment in this " +
                 "metrics recorder is" + (statistics != null ? " " : " not ") + 
"null. " +
                 "This is a bug in Kafka Streams. " +
                 "Please open a bug report under 
https://issues.apache.org/jira/projects/KAFKA/issues";);
         }
     }
 
+    private void verifyDbAndCacheAndStatistics(final String segmentName,
+                                               final RocksDB db,
+                                               final Cache cache,
+                                               final Statistics statistics) {
+        for (final DbAndCacheAndStatistics valueProviders : 
storeToValueProviders.values()) {
+            verifyIfSomeAreNull(segmentName, statistics, 
valueProviders.statistics, "statistics");
+            verifyIfSomeAreNull(segmentName, cache, valueProviders.cache, 
"cache");
+            if (db == valueProviders.db) {
+                throw new IllegalStateException("DB instance for store " + 
segmentName + " of task " + taskId +
+                    " was already added for another segment as a value 
provider. This is a bug in Kafka Streams. " +
+                    "Please open a bug report under 
https://issues.apache.org/jira/projects/KAFKA/issues";);
+            }
+            if (storeToValueProviders.size() == 1 && cache != 
valueProviders.cache) {
+                singleCache = false;
+            } else if (singleCache && cache != valueProviders.cache || 
!singleCache && cache == valueProviders.cache) {
+                throw new IllegalStateException("Caches for store " + 
storeName + " of task " + taskId +
+                    " are either not all distinct or do not all refer to the 
same cache. This is a bug in Kafka Streams. " +
+                    "Please open a bug report under 
https://issues.apache.org/jira/projects/KAFKA/issues";);
+            }
+        }
+    }
+
+    private void verifyIfSomeAreNull(final String segmentName,

Review comment:
       nit: `verifyConsistentSegmentValueProviders`?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java
##########
@@ -174,22 +241,163 @@ private void initSensors(final StreamsMetricsImpl 
streamsMetrics, final RocksDBM
         numberOfFileErrorsSensor = 
RocksDBMetrics.numberOfFileErrorsSensor(streamsMetrics, metricContext);
     }
 
-    private void initGauges(final StreamsMetricsImpl streamsMetrics, final 
RocksDBMetricContext metricContext) {
-        RocksDBMetrics.addNumEntriesActiveMemTableMetric(streamsMetrics, 
metricContext, (metricsConfig, now) -> {
+    private void initGauges(final StreamsMetricsImpl streamsMetrics,
+                            final RocksDBMetricContext metricContext) {
+        RocksDBMetrics.addNumImmutableMemTableMetric(
+            streamsMetrics,
+            metricContext,
+            gaugeToComputeSumOfProperties(NUMBER_OF_IMMUTABLE_MEMTABLES)
+        );
+        RocksDBMetrics.addCurSizeActiveMemTable(
+            streamsMetrics,
+            metricContext,
+            gaugeToComputeSumOfProperties(CURRENT_SIZE_OF_ACTIVE_MEMTABLE)
+        );
+        RocksDBMetrics.addCurSizeAllMemTables(
+            streamsMetrics,
+            metricContext,
+            gaugeToComputeSumOfProperties(CURRENT_SIZE_OF_ALL_MEMTABLES)
+        );
+        RocksDBMetrics.addSizeAllMemTables(
+            streamsMetrics,
+            metricContext,
+            gaugeToComputeSumOfProperties(SIZE_OF_ALL_MEMTABLES)
+        );
+        RocksDBMetrics.addNumEntriesActiveMemTableMetric(
+            streamsMetrics,
+            metricContext,
+            gaugeToComputeSumOfProperties(NUMBER_OF_ENTRIES_ACTIVE_MEMTABLE)
+        );
+        RocksDBMetrics.addNumDeletesActiveMemTableMetric(
+            streamsMetrics,
+            metricContext,
+            gaugeToComputeSumOfProperties(NUMBER_OF_DELETES_ACTIVE_MEMTABLE)
+        );
+        RocksDBMetrics.addNumEntriesImmMemTablesMetric(
+            streamsMetrics,
+            metricContext,
+            
gaugeToComputeSumOfProperties(NUMBER_OF_ENTRIES_IMMUTABLE_MEMTABLES)
+        );
+        RocksDBMetrics.addNumDeletesImmMemTablesMetric(
+            streamsMetrics,
+            metricContext,
+            
gaugeToComputeSumOfProperties(NUMBER_OF_DELETES_IMMUTABLE_MEMTABLES)
+        );
+        RocksDBMetrics.addMemTableFlushPending(
+            streamsMetrics,
+            metricContext,
+            gaugeToComputeSumOfProperties(MEMTABLE_FLUSH_PENDING)
+        );
+        RocksDBMetrics.addNumRunningFlushesMetric(
+            streamsMetrics,
+            metricContext,
+            gaugeToComputeSumOfProperties(NUMBER_OF_RUNNING_FLUSHES)
+        );
+        RocksDBMetrics.addCompactionPendingMetric(
+            streamsMetrics,
+            metricContext,
+            gaugeToComputeSumOfProperties(COMPACTION_PENDING)
+        );
+        RocksDBMetrics.addNumRunningCompactionsMetric(
+            streamsMetrics,
+            metricContext,
+            gaugeToComputeSumOfProperties(NUMBER_OF_RUNNING_COMPACTIONS)
+        );
+        RocksDBMetrics.addEstimatePendingCompactionBytesMetric(
+            streamsMetrics,
+            metricContext,
+            
gaugeToComputeSumOfProperties(ESTIMATED_BYTES_OF_PENDING_COMPACTION)
+        );
+        RocksDBMetrics.addTotalSstFilesSizeMetric(
+            streamsMetrics,
+            metricContext,
+            gaugeToComputeSumOfProperties(TOTAL_SST_FILES_SIZE)
+        );
+        RocksDBMetrics.addLiveSstFilesSizeMetric(
+            streamsMetrics,
+            metricContext,
+            gaugeToComputeSumOfProperties(LIVE_SST_FILES_SIZE)
+        );
+        RocksDBMetrics.addNumLiveVersionMetric(
+            streamsMetrics,
+            metricContext,
+            gaugeToComputeSumOfProperties(NUMBER_OF_LIVE_VERSIONS)
+        );
+        RocksDBMetrics.addEstimateNumKeysMetric(
+            streamsMetrics,
+            metricContext,
+            gaugeToComputeSumOfProperties(ESTIMATED_NUMBER_OF_KEYS)
+        );
+        RocksDBMetrics.addEstimateTableReadersMemMetric(
+            streamsMetrics,
+            metricContext,
+            gaugeToComputeSumOfProperties(ESTIMATED_MEMORY_OF_TABLE_READERS)
+        );
+        RocksDBMetrics.addBackgroundErrorsMetric(
+            streamsMetrics,
+            metricContext,
+            gaugeToComputeSumOfProperties(NUMBER_OF_BACKGROUND_ERRORS)
+        );
+        RocksDBMetrics.addBlockCacheCapacityMetric(
+            streamsMetrics,
+            metricContext,
+            gaugeToComputeBlockCacheMetrics(CAPACITY_OF_BLOCK_CACHE)
+        );
+        RocksDBMetrics.addBlockCacheUsageMetric(
+            streamsMetrics,
+            metricContext,
+            gaugeToComputeBlockCacheMetrics(USAGE_OF_BLOCK_CACHE)
+        );
+        RocksDBMetrics.addBlockCachePinnedUsageMetric(
+            streamsMetrics,
+            metricContext,
+            gaugeToComputeBlockCacheMetrics(PINNED_USAGE_OF_BLOCK_CACHE)
+        );
+    }
+
+    private Gauge<BigInteger> gaugeToComputeSumOfProperties(final String 
propertyName) {
+        return (metricsConfig, now) -> {
             BigInteger result = BigInteger.valueOf(0);
             for (final DbAndCacheAndStatistics valueProvider : 
storeToValueProviders.values()) {
                 try {
                     // values of RocksDB properties are of type unsigned long 
in C++, i.e., in Java we need to use
                     // BigInteger and construct the object from the byte 
representation of the value
                     result = result.add(new BigInteger(1, longToBytes(
-                        
valueProvider.db.getAggregatedLongProperty(ROCKSDB_PROPERTIES_PREFIX + 
NUMBER_OF_ENTRIES_ACTIVE_MEMTABLE))));
+                        
valueProvider.db.getAggregatedLongProperty(ROCKSDB_PROPERTIES_PREFIX + 
propertyName)
+                    )));
+                } catch (final RocksDBException e) {

Review comment:
       Is this a piggy-backed fix to wrap RocksDBException here?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to