vkorukanti commented on a change in pull request #33455:
URL: https://github.com/apache/spark/pull/33455#discussion_r674029730
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
##########
@@ -106,22 +106,46 @@ private[sql] class RocksDBStateStoreProvider
override def metrics: StateStoreMetrics = {
val rocksDBMetrics = rocksDB.metrics
def commitLatencyMs(typ: String): Long =
rocksDBMetrics.lastCommitLatencyMs.getOrElse(typ, 0L)
- def avgNativeOpsLatencyMs(typ: String): Long = {
-
rocksDBMetrics.nativeOpsLatencyMicros.get(typ).map(_.avg).getOrElse(0.0).toLong
+ def nativeOpsLatencyMillis(typ: String): Long = {
+ rocksDBMetrics.nativeOpsMetrics.get(typ).map(_ * 1000).getOrElse(0)
Review comment:
its actually `nativeOpsMetrics` (the ticker based metrics)
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
##########
@@ -331,10 +335,35 @@ class RocksDB(
val totalSSTFilesBytes = getDBProperty("rocksdb.total-sst-files-size")
val readerMemUsage = getDBProperty("rocksdb.estimate-table-readers-mem")
val memTableMemUsage = getDBProperty("rocksdb.size-all-mem-tables")
- val nativeOps = Seq("get" -> DB_GET, "put" -> DB_WRITE).toMap
- val nativeOpsLatencyMicros = nativeOps.mapValues { typ =>
+ val nativeOpsHistograms = Seq(
+ "get" -> DB_GET,
+ "put" -> DB_WRITE,
+ "compaction" -> COMPACTION_TIME
+ ).toMap
+ val nativeOpsLatencyMicros = nativeOpsHistograms.mapValues { typ =>
RocksDBNativeHistogram(nativeStats.getHistogramData(typ))
}
+ val nativeOpsMetricTickers = Seq(
+ /** Number of cache misses that required reading from local disk */
+ "readBlockCacheMissCount" -> BLOCK_CACHE_MISS,
+ /** Number of cache hits that read data from RocksDB block cache
avoiding local disk read */
+ "readBlockCacheHitCount" -> BLOCK_CACHE_HIT,
+ /** Number of uncompressed bytes read (from memtables/cache/sst) from
DB::Get() */
+ "totalBytesReadByGet" -> BYTES_READ,
+ /** Number of uncompressed bytes issued by DB::{Put(), Delete(),
Merge(), Write()} */
+ "totalBytesWrittenByPut" -> BYTES_WRITTEN,
+ /** The number of uncompressed bytes read from an iterator. */
+ "totalBytesReadThroughIterator" -> ITER_BYTES_READ,
Review comment:
We use the iterator when processing the watermark or timeout for
cleaning up the old records which reads the complete state currently. This is
tracked separately from the `get` calls issued when processing the input.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]