This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new ace85f25d11 [SPARK-43120][SS] Add support for tracking pinned blocks 
memory usage for RocksDB state store
ace85f25d11 is described below

commit ace85f25d11d6a771ec45493e73bd25c98b22803
Author: Anish Shrigondekar <anish.shrigonde...@databricks.com>
AuthorDate: Thu Apr 13 22:16:30 2023 +0900

    [SPARK-43120][SS] Add support for tracking pinned blocks memory usage for 
RocksDB state store
    
    ### What changes were proposed in this pull request?
    Add support for tracking pinned blocks memory usage for RocksDB state store
    
    ### Why are the changes needed?
    Today we only track total memory usage for RocksDB that comprises of write 
buffer, block cache and index/filter blocks. Its also useful to understand 
usage of blocks pinned in the cache, potentially due to iterators not releasing 
them.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Modified unit tests
    
    ```
    [info] Run completed in 11 seconds, 964 milliseconds.
    [info] Total number of tests run: 5
    [info] Suites: completed 1, aborted 0
    [info] Tests: succeeded 5, failed 0, canceled 0, ignored 0, pending 0
    [info] All tests passed.
    ```
    
    Also, verified that the metrics appear in the custom metrics for state 
store:
    
    ```
        "customMetrics" : {
          "rocksdbBytesCopied" : 1189,
          "rocksdbCommitCheckpointLatency" : 14,
          "rocksdbCommitCompactLatency" : 0,
          "rocksdbCommitFileSyncLatencyMs" : 150,
          "rocksdbCommitFlushLatency" : 15,
          "rocksdbCommitPauseLatency" : 0,
          "rocksdbCommitWriteBatchLatency" : 4,
          "rocksdbFilesCopied" : 1,
          "rocksdbFilesReused" : 0,
          "rocksdbGetCount" : 6,
          "rocksdbGetLatency" : 0,
          "rocksdbPinnedBlocksMemoryUsage" : 87,
          "rocksdbPutCount" : 1,
          "rocksdbPutLatency" : 4,
          "rocksdbReadBlockCacheHitCount" : 0,
          "rocksdbReadBlockCacheMissCount" : 0,
          "rocksdbSstFileSize" : 1189,
          "rocksdbTotalBytesRead" : 0,
          "rocksdbTotalBytesReadByCompaction" : 0,
          "rocksdbTotalBytesReadThroughIterator" : 0,
          "rocksdbTotalBytesWritten" : 123,
          "rocksdbTotalBytesWrittenByCompaction" : 0,
          "rocksdbTotalBytesWrittenByFlush" : 1325,
          "rocksdbTotalCompactionLatencyMs" : 0,
          "rocksdbWriterStallLatencyMs" : 0,
          "rocksdbZipFileBytesUncompressed" : 7283
        }
    ```
    
    Closes #40770 from anishshri-db/task/SPARK-43120.
    
    Authored-by: Anish Shrigondekar <anish.shrigonde...@databricks.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 .../apache/spark/sql/execution/streaming/state/RocksDB.scala |  3 +++
 .../streaming/state/RocksDBStateStoreProvider.scala          | 12 ++++++++----
 .../streaming/state/RocksDBStateStoreIntegrationSuite.scala  |  3 ++-
 .../spark/sql/execution/streaming/state/RocksDBSuite.scala   |  1 +
 4 files changed, 14 insertions(+), 5 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
index 78a674686b7..c0330f75088 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
@@ -393,6 +393,7 @@ class RocksDB(
     val readerMemUsage = getDBProperty("rocksdb.estimate-table-readers-mem")
     val memTableMemUsage = getDBProperty("rocksdb.size-all-mem-tables")
     val blockCacheUsage = getDBProperty("rocksdb.block-cache-usage")
+    val pinnedBlocksMemUsage = 
getDBProperty("rocksdb.block-cache-pinned-usage")
     // Get the approximate memory usage of this writeBatchWithIndex
     val writeBatchMemUsage = writeBatch.getWriteBatch.getDataSize
     val nativeOpsHistograms = Seq(
@@ -431,6 +432,7 @@ class RocksDB(
       numKeysOnLoadedVersion,
       numKeysOnWritingVersion,
       readerMemUsage + memTableMemUsage + blockCacheUsage + writeBatchMemUsage,
+      pinnedBlocksMemUsage,
       writeBatchMemUsage,
       totalSSTFilesBytes,
       nativeOpsLatencyMicros.toMap,
@@ -708,6 +710,7 @@ case class RocksDBMetrics(
     numCommittedKeys: Long,
     numUncommittedKeys: Long,
     totalMemUsageBytes: Long,
+    pinnedBlocksMemUsage: Long,
     writeBatchMemUsageBytes: Long,
     totalSSTFilesBytes: Long,
     nativeOpsHistograms: Map[String, RocksDBNativeHistogram],
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
index 3a128561b1d..b4b648c3693 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
@@ -143,7 +143,8 @@ private[sql] class RocksDBStateStoreProvider
         CUSTOM_METRIC_TOTAL_COMPACT_TIME -> 
sumNativeOpsLatencyMillis("compaction"),
         CUSTOM_METRIC_COMPACT_READ_BYTES -> 
nativeOpsMetrics("totalBytesReadByCompaction"),
         CUSTOM_METRIC_COMPACT_WRITTEN_BYTES -> 
nativeOpsMetrics("totalBytesWrittenByCompaction"),
-        CUSTOM_METRIC_FLUSH_WRITTEN_BYTES -> 
nativeOpsMetrics("totalBytesWrittenByFlush")
+        CUSTOM_METRIC_FLUSH_WRITTEN_BYTES -> 
nativeOpsMetrics("totalBytesWrittenByFlush"),
+        CUSTOM_METRIC_PINNED_BLOCKS_MEM_USAGE -> 
rocksDBMetrics.pinnedBlocksMemUsage
       ) ++ rocksDBMetrics.zipFileBytesUncompressed.map(bytes =>
         Map(CUSTOM_METRIC_ZIP_FILE_BYTES_UNCOMPRESSED -> 
bytes)).getOrElse(Map())
 
@@ -299,8 +300,10 @@ object RocksDBStateStoreProvider {
     "RocksDB: compaction - total bytes written by the compaction process")
   val CUSTOM_METRIC_FLUSH_WRITTEN_BYTES = StateStoreCustomSizeMetric(
     "rocksdbTotalBytesWrittenByFlush",
-    "RocksDB: flush - total bytes written by flush"
-  )
+    "RocksDB: flush - total bytes written by flush")
+  val CUSTOM_METRIC_PINNED_BLOCKS_MEM_USAGE = StateStoreCustomSizeMetric(
+    "rocksdbPinnedBlocksMemoryUsage",
+    "RocksDB: memory usage for pinned blocks")
 
   // Total SST file size
   val CUSTOM_METRIC_SST_FILE_SIZE = StateStoreCustomSizeMetric(
@@ -315,6 +318,7 @@ object RocksDBStateStoreProvider {
     CUSTOM_METRIC_BLOCK_CACHE_MISS, CUSTOM_METRIC_BLOCK_CACHE_HITS, 
CUSTOM_METRIC_BYTES_READ,
     CUSTOM_METRIC_BYTES_WRITTEN, CUSTOM_METRIC_ITERATOR_BYTES_READ, 
CUSTOM_METRIC_STALL_TIME,
     CUSTOM_METRIC_TOTAL_COMPACT_TIME, CUSTOM_METRIC_COMPACT_READ_BYTES,
-    CUSTOM_METRIC_COMPACT_WRITTEN_BYTES, CUSTOM_METRIC_FLUSH_WRITTEN_BYTES
+    CUSTOM_METRIC_COMPACT_WRITTEN_BYTES, CUSTOM_METRIC_FLUSH_WRITTEN_BYTES,
+    CUSTOM_METRIC_PINNED_BLOCKS_MEM_USAGE
   )
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreIntegrationSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreIntegrationSuite.scala
index d1af0acb530..345526bb986 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreIntegrationSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreIntegrationSuite.scala
@@ -95,7 +95,8 @@ class RocksDBStateStoreIntegrationSuite extends StreamTest {
               "rocksdbReadBlockCacheHitCount", 
"rocksdbReadBlockCacheMissCount",
               "rocksdbTotalBytesReadByCompaction", 
"rocksdbTotalBytesWrittenByCompaction",
               "rocksdbTotalCompactionLatencyMs", "rocksdbWriterStallLatencyMs",
-              "rocksdbTotalBytesReadThroughIterator", 
"rocksdbTotalBytesWrittenByFlush"))
+              "rocksdbTotalBytesReadThroughIterator", 
"rocksdbTotalBytesWrittenByFlush",
+              "rocksdbPinnedBlocksMemoryUsage"))
           }
         } finally {
           query.stop()
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
index 3d45a8868e0..4b368f7f6cf 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
@@ -607,6 +607,7 @@ class RocksDBSuite extends SparkFunSuite {
         assert(metrics.nativeOpsHistograms("compaction").count > 0)
         assert(metrics.nativeOpsMetrics("totalBytesReadByCompaction") > 0)
         assert(metrics.nativeOpsMetrics("totalBytesWrittenByCompaction") > 0)
+        assert(metrics.pinnedBlocksMemUsage >= 0)
       }
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to