This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new ace85f25d11 [SPARK-43120][SS] Add support for tracking pinned blocks memory usage for RocksDB state store ace85f25d11 is described below commit ace85f25d11d6a771ec45493e73bd25c98b22803 Author: Anish Shrigondekar <anish.shrigonde...@databricks.com> AuthorDate: Thu Apr 13 22:16:30 2023 +0900 [SPARK-43120][SS] Add support for tracking pinned blocks memory usage for RocksDB state store ### What changes were proposed in this pull request? Add support for tracking pinned blocks memory usage for RocksDB state store ### Why are the changes needed? Today we only track total memory usage for RocksDB that comprises of write buffer, block cache and index/filter blocks. Its also useful to understand usage of blocks pinned in the cache, potentially due to iterators not releasing them. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Modified unit tests ``` [info] Run completed in 11 seconds, 964 milliseconds. [info] Total number of tests run: 5 [info] Suites: completed 1, aborted 0 [info] Tests: succeeded 5, failed 0, canceled 0, ignored 0, pending 0 [info] All tests passed. ``` Also, verified that the metrics appear in the custom metrics for state store: ``` "customMetrics" : { "rocksdbBytesCopied" : 1189, "rocksdbCommitCheckpointLatency" : 14, "rocksdbCommitCompactLatency" : 0, "rocksdbCommitFileSyncLatencyMs" : 150, "rocksdbCommitFlushLatency" : 15, "rocksdbCommitPauseLatency" : 0, "rocksdbCommitWriteBatchLatency" : 4, "rocksdbFilesCopied" : 1, "rocksdbFilesReused" : 0, "rocksdbGetCount" : 6, "rocksdbGetLatency" : 0, "rocksdbPinnedBlocksMemoryUsage" : 87, "rocksdbPutCount" : 1, "rocksdbPutLatency" : 4, "rocksdbReadBlockCacheHitCount" : 0, "rocksdbReadBlockCacheMissCount" : 0, "rocksdbSstFileSize" : 1189, "rocksdbTotalBytesRead" : 0, "rocksdbTotalBytesReadByCompaction" : 0, "rocksdbTotalBytesReadThroughIterator" : 0, "rocksdbTotalBytesWritten" : 123, "rocksdbTotalBytesWrittenByCompaction" : 0, "rocksdbTotalBytesWrittenByFlush" : 1325, "rocksdbTotalCompactionLatencyMs" : 0, "rocksdbWriterStallLatencyMs" : 0, "rocksdbZipFileBytesUncompressed" : 7283 } ``` Closes #40770 from anishshri-db/task/SPARK-43120. Authored-by: Anish Shrigondekar <anish.shrigonde...@databricks.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- .../apache/spark/sql/execution/streaming/state/RocksDB.scala | 3 +++ .../streaming/state/RocksDBStateStoreProvider.scala | 12 ++++++++---- .../streaming/state/RocksDBStateStoreIntegrationSuite.scala | 3 ++- .../spark/sql/execution/streaming/state/RocksDBSuite.scala | 1 + 4 files changed, 14 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala index 78a674686b7..c0330f75088 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala @@ -393,6 +393,7 @@ class RocksDB( val readerMemUsage = getDBProperty("rocksdb.estimate-table-readers-mem") val memTableMemUsage = getDBProperty("rocksdb.size-all-mem-tables") val blockCacheUsage = getDBProperty("rocksdb.block-cache-usage") + val pinnedBlocksMemUsage = getDBProperty("rocksdb.block-cache-pinned-usage") // Get the approximate memory usage of this writeBatchWithIndex val writeBatchMemUsage = writeBatch.getWriteBatch.getDataSize val nativeOpsHistograms = Seq( @@ -431,6 +432,7 @@ class RocksDB( numKeysOnLoadedVersion, numKeysOnWritingVersion, readerMemUsage + memTableMemUsage + blockCacheUsage + writeBatchMemUsage, + pinnedBlocksMemUsage, writeBatchMemUsage, totalSSTFilesBytes, nativeOpsLatencyMicros.toMap, @@ -708,6 +710,7 @@ case class RocksDBMetrics( numCommittedKeys: Long, numUncommittedKeys: Long, totalMemUsageBytes: Long, + pinnedBlocksMemUsage: Long, writeBatchMemUsageBytes: Long, totalSSTFilesBytes: Long, nativeOpsHistograms: Map[String, RocksDBNativeHistogram], diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala index 3a128561b1d..b4b648c3693 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala @@ -143,7 +143,8 @@ private[sql] class RocksDBStateStoreProvider CUSTOM_METRIC_TOTAL_COMPACT_TIME -> sumNativeOpsLatencyMillis("compaction"), CUSTOM_METRIC_COMPACT_READ_BYTES -> nativeOpsMetrics("totalBytesReadByCompaction"), CUSTOM_METRIC_COMPACT_WRITTEN_BYTES -> nativeOpsMetrics("totalBytesWrittenByCompaction"), - CUSTOM_METRIC_FLUSH_WRITTEN_BYTES -> nativeOpsMetrics("totalBytesWrittenByFlush") + CUSTOM_METRIC_FLUSH_WRITTEN_BYTES -> nativeOpsMetrics("totalBytesWrittenByFlush"), + CUSTOM_METRIC_PINNED_BLOCKS_MEM_USAGE -> rocksDBMetrics.pinnedBlocksMemUsage ) ++ rocksDBMetrics.zipFileBytesUncompressed.map(bytes => Map(CUSTOM_METRIC_ZIP_FILE_BYTES_UNCOMPRESSED -> bytes)).getOrElse(Map()) @@ -299,8 +300,10 @@ object RocksDBStateStoreProvider { "RocksDB: compaction - total bytes written by the compaction process") val CUSTOM_METRIC_FLUSH_WRITTEN_BYTES = StateStoreCustomSizeMetric( "rocksdbTotalBytesWrittenByFlush", - "RocksDB: flush - total bytes written by flush" - ) + "RocksDB: flush - total bytes written by flush") + val CUSTOM_METRIC_PINNED_BLOCKS_MEM_USAGE = StateStoreCustomSizeMetric( + "rocksdbPinnedBlocksMemoryUsage", + "RocksDB: memory usage for pinned blocks") // Total SST file size val CUSTOM_METRIC_SST_FILE_SIZE = StateStoreCustomSizeMetric( @@ -315,6 +318,7 @@ object RocksDBStateStoreProvider { CUSTOM_METRIC_BLOCK_CACHE_MISS, CUSTOM_METRIC_BLOCK_CACHE_HITS, CUSTOM_METRIC_BYTES_READ, CUSTOM_METRIC_BYTES_WRITTEN, CUSTOM_METRIC_ITERATOR_BYTES_READ, CUSTOM_METRIC_STALL_TIME, CUSTOM_METRIC_TOTAL_COMPACT_TIME, CUSTOM_METRIC_COMPACT_READ_BYTES, - CUSTOM_METRIC_COMPACT_WRITTEN_BYTES, CUSTOM_METRIC_FLUSH_WRITTEN_BYTES + CUSTOM_METRIC_COMPACT_WRITTEN_BYTES, CUSTOM_METRIC_FLUSH_WRITTEN_BYTES, + CUSTOM_METRIC_PINNED_BLOCKS_MEM_USAGE ) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreIntegrationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreIntegrationSuite.scala index d1af0acb530..345526bb986 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreIntegrationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreIntegrationSuite.scala @@ -95,7 +95,8 @@ class RocksDBStateStoreIntegrationSuite extends StreamTest { "rocksdbReadBlockCacheHitCount", "rocksdbReadBlockCacheMissCount", "rocksdbTotalBytesReadByCompaction", "rocksdbTotalBytesWrittenByCompaction", "rocksdbTotalCompactionLatencyMs", "rocksdbWriterStallLatencyMs", - "rocksdbTotalBytesReadThroughIterator", "rocksdbTotalBytesWrittenByFlush")) + "rocksdbTotalBytesReadThroughIterator", "rocksdbTotalBytesWrittenByFlush", + "rocksdbPinnedBlocksMemoryUsage")) } } finally { query.stop() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala index 3d45a8868e0..4b368f7f6cf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala @@ -607,6 +607,7 @@ class RocksDBSuite extends SparkFunSuite { assert(metrics.nativeOpsHistograms("compaction").count > 0) assert(metrics.nativeOpsMetrics("totalBytesReadByCompaction") > 0) assert(metrics.nativeOpsMetrics("totalBytesWrittenByCompaction") > 0) + assert(metrics.pinnedBlocksMemUsage >= 0) } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org