This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 6259b97855d [SPARK-42792][SS] Add support for WRITE_FLUSH_BYTES for RocksDB used in streaming stateful operators 6259b97855d is described below commit 6259b97855d623de8cbe1d669ed9fe968bddb197 Author: Anish Shrigondekar <anish.shrigonde...@databricks.com> AuthorDate: Wed Mar 15 15:53:44 2023 +0900 [SPARK-42792][SS] Add support for WRITE_FLUSH_BYTES for RocksDB used in streaming stateful operators ### What changes were proposed in this pull request? Add support for WRITE_FLUSH_BYTES for RocksDB used in streaming stateful operators ### Why are the changes needed? Its useful to get this metric for bytes written during flush from RocksDB as part of the DB custom metrics. We propose to add this to the existing metrics that are collected. There is no additional overhead since we are just querying the internal ticker guage, similar to other metrics. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added unit test ``` [info] Run completed in 45 seconds, 260 milliseconds. [info] Total number of tests run: 18 [info] Suites: completed 1, aborted 0 [info] Tests: succeeded 18, failed 0, canceled 0, ignored 0, pending 0 [info] All tests passed. [success] Total time: 152 s (02:32), completed Mar 14, 2023, 3:43:41 PM ``` Info log on executor: ``` StateStoreId(opId=0,partId=3,name=default): Committed 2, stats = {"numCommittedKeys":4,"numUncommittedKeys":4,"totalMemUsageBytes":7818,"writeBatchMemUsageBytes":272,"totalSSTFilesBytes":2614,"nativeOpsHistograms":{"get":{"sum":14,"avg":7.0,"stddev":1.0,"media n":6.0,"p95":8.0,"p99":8.0,"count":2},"put":{"sum":37966,"avg":37966.0,"stddev":0.0,"median":37966.0,"p95":37966.0,"p99":37966.0,"count":1},"compaction":{"sum":0,"avg":0.0,"stddev":0.0,"median":0.0,"p95":0.0,"p99":0.0,"count":0}},"lastCommitLatencyMs":{"fileSync":188,"writeBatch":37,"flush":61,"pause":0,"checkpoint":61,"compact":0} ,"filesCopied":1,"bytesCopied":1280,"filesReused":1,"zipFileBytesUncompressed":7675,"nativeOpsMetrics":{"writerStallDuration":0,"totalBytesReadThroughIterator":254,"totalBytesWrittenByFlush":1490,"readBlockCacheHitCount":2,"totalBytesWrittenByCompaction":0,"readBlockCacheMissCount":0,"totalBytesReadByCompaction":0,"totalBytesWritte n":272,"totalBytesRead":73}} ``` Info log on driver: ``` "customMetrics" : { "rocksdbBytesCopied" : 2544, "rocksdbCommitCheckpointLatency" : 416, "rocksdbCommitCompactLatency" : 0, "rocksdbCommitFileSyncLatencyMs" : 742, "rocksdbCommitFlushLatency" : 194, "rocksdbCommitPauseLatency" : 0, "rocksdbCommitWriteBatchLatency" : 132, "rocksdbFilesCopied" : 2, "rocksdbFilesReused" : 2, "rocksdbGetCount" : 4, "rocksdbGetLatency" : 0, "rocksdbPutCount" : 5, "rocksdbPutLatency" : 132, "rocksdbReadBlockCacheHitCount" : 4, "rocksdbReadBlockCacheMissCount" : 0, "rocksdbSstFileSize" : 5143, "rocksdbTotalBytesRead" : 138, "rocksdbTotalBytesReadByCompaction" : 0, "rocksdbTotalBytesReadThroughIterator" : 714, "rocksdbTotalBytesWritten" : 548, "rocksdbTotalBytesWrittenByCompaction" : 0, "rocksdbTotalBytesWrittenByFlush" : 2948, "rocksdbTotalCompactionLatencyMs" : 0, "rocksdbWriterStallLatencyMs" : 0, "rocksdbZipFileBytesUncompressed" : 36542 } } ], ``` Closes #40427 from anishshri-db/task/SPARK-42792. Authored-by: Anish Shrigondekar <anish.shrigonde...@databricks.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- .../org/apache/spark/sql/execution/streaming/state/RocksDB.scala | 4 +++- .../execution/streaming/state/RocksDBStateStoreProvider.scala | 9 +++++++-- .../streaming/state/RocksDBStateStoreIntegrationSuite.scala | 2 +- .../spark/sql/execution/streaming/state/RocksDBSuite.scala | 2 ++ 4 files changed, 13 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala index 4ce4a03822e..89872afb80e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala @@ -403,7 +403,9 @@ class RocksDB( /** Number of bytes read during compaction */ "totalBytesReadByCompaction" -> COMPACT_READ_BYTES, /** Number of bytes written during compaction */ - "totalBytesWrittenByCompaction" -> COMPACT_WRITE_BYTES + "totalBytesWrittenByCompaction" -> COMPACT_WRITE_BYTES, + /** Number of bytes written during flush */ + "totalBytesWrittenByFlush" -> FLUSH_WRITE_BYTES ).toMap val nativeOpsMetrics = nativeOpsMetricTickers.mapValues { typ => nativeStats.getTickerCount(typ) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala index 79614df6299..3a128561b1d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala @@ -142,7 +142,8 @@ private[sql] class RocksDBStateStoreProvider CUSTOM_METRIC_STALL_TIME -> nativeOpsLatencyMillis("writerStallDuration"), CUSTOM_METRIC_TOTAL_COMPACT_TIME -> sumNativeOpsLatencyMillis("compaction"), CUSTOM_METRIC_COMPACT_READ_BYTES -> nativeOpsMetrics("totalBytesReadByCompaction"), - CUSTOM_METRIC_COMPACT_WRITTEN_BYTES -> nativeOpsMetrics("totalBytesWrittenByCompaction") + CUSTOM_METRIC_COMPACT_WRITTEN_BYTES -> nativeOpsMetrics("totalBytesWrittenByCompaction"), + CUSTOM_METRIC_FLUSH_WRITTEN_BYTES -> nativeOpsMetrics("totalBytesWrittenByFlush") ) ++ rocksDBMetrics.zipFileBytesUncompressed.map(bytes => Map(CUSTOM_METRIC_ZIP_FILE_BYTES_UNCOMPRESSED -> bytes)).getOrElse(Map()) @@ -296,6 +297,10 @@ object RocksDBStateStoreProvider { val CUSTOM_METRIC_COMPACT_WRITTEN_BYTES = StateStoreCustomSizeMetric( "rocksdbTotalBytesWrittenByCompaction", "RocksDB: compaction - total bytes written by the compaction process") + val CUSTOM_METRIC_FLUSH_WRITTEN_BYTES = StateStoreCustomSizeMetric( + "rocksdbTotalBytesWrittenByFlush", + "RocksDB: flush - total bytes written by flush" + ) // Total SST file size val CUSTOM_METRIC_SST_FILE_SIZE = StateStoreCustomSizeMetric( @@ -310,6 +315,6 @@ object RocksDBStateStoreProvider { CUSTOM_METRIC_BLOCK_CACHE_MISS, CUSTOM_METRIC_BLOCK_CACHE_HITS, CUSTOM_METRIC_BYTES_READ, CUSTOM_METRIC_BYTES_WRITTEN, CUSTOM_METRIC_ITERATOR_BYTES_READ, CUSTOM_METRIC_STALL_TIME, CUSTOM_METRIC_TOTAL_COMPACT_TIME, CUSTOM_METRIC_COMPACT_READ_BYTES, - CUSTOM_METRIC_COMPACT_WRITTEN_BYTES + CUSTOM_METRIC_COMPACT_WRITTEN_BYTES, CUSTOM_METRIC_FLUSH_WRITTEN_BYTES ) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreIntegrationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreIntegrationSuite.scala index dc505963b4d..d1af0acb530 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreIntegrationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreIntegrationSuite.scala @@ -95,7 +95,7 @@ class RocksDBStateStoreIntegrationSuite extends StreamTest { "rocksdbReadBlockCacheHitCount", "rocksdbReadBlockCacheMissCount", "rocksdbTotalBytesReadByCompaction", "rocksdbTotalBytesWrittenByCompaction", "rocksdbTotalCompactionLatencyMs", "rocksdbWriterStallLatencyMs", - "rocksdbTotalBytesReadThroughIterator")) + "rocksdbTotalBytesReadThroughIterator", "rocksdbTotalBytesWrittenByFlush")) } } finally { query.stop() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala index cf0ff4808fc..417eff65482 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala @@ -544,6 +544,8 @@ class RocksDBSuite extends SparkFunSuite { assert(metrics.nativeOpsMetrics("writerStallDuration") >= 0) assert(metrics.nativeOpsMetrics("totalBytesReadByCompaction") >= 0) assert(metrics.nativeOpsMetrics("totalBytesWrittenByCompaction") >=0) + + assert(metrics.nativeOpsMetrics("totalBytesWrittenByFlush") >= 0) } withTempDir { dir => --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org