viirya commented on a change in pull request #33455:
URL: https://github.com/apache/spark/pull/33455#discussion_r673775315



##########
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
##########
@@ -366,6 +366,100 @@ class RocksDBSuite extends SparkFunSuite {
     // scalastyle:on line.size.limit
   }
 
+  test("reset RocksDB metrics whenever a new version is loaded") {
+    def verifyMetrics(putCount: Long, getCount: Long, iterCountPositive: 
Boolean = false,
+                      metrics: RocksDBMetrics): Unit = {
+      assert(metrics.nativeOpsHistograms("put").count === putCount, "invalid 
put count")
+      assert(metrics.nativeOpsHistograms("get").count === getCount, "invalid 
get count")
+      if (iterCountPositive) {
+        assert(metrics.nativeOpsMetrics("totalBytesReadThroughIterator") > 0)
+      } else {
+        assert(metrics.nativeOpsMetrics("totalBytesReadThroughIterator") === 0)
+      }
+
+      // most of the time get reads from WriteBatch which is not counted in 
this metric
+      assert(metrics.nativeOpsMetrics("totalBytesReadByGet") >= 0)
+      assert(metrics.nativeOpsMetrics("totalBytesWrittenByPut") >= putCount * 
1)
+
+      assert(metrics.nativeOpsHistograms("compaction") != null)
+      assert(metrics.nativeOpsMetrics("readBlockCacheMissCount") >= 0)
+      assert(metrics.nativeOpsMetrics("readBlockCacheHitCount") >= 0)
+
+      assert(metrics.nativeOpsMetrics("writerStallDuration") >= 0)
+      assert(metrics.nativeOpsMetrics("totalBytesReadByCompaction") >= 0)
+      assert(metrics.nativeOpsMetrics("totalBytesWrittenByCompaction") >=0)
+    }
+
+    withTempDirectory { dir =>
+      val remoteDir = dir.getCanonicalPath
+      withDB(remoteDir) { db =>
+        verifyMetrics(putCount = 0, getCount = 0, metrics = db.metrics)
+        db.load(0)
+        db.put("a", "1") // put also triggers a db get
+        db.get("a") // this is found in-memory writebatch - no get triggered 
in db
+        db.get("b") // key doesn't exists - triggers db get
+        db.commit()
+        verifyMetrics(putCount = 1, getCount = 2, metrics = db.metrics)
+
+        db.load(1)
+        db.put("b", "2") // put also triggers a db get
+        db.get("a") // not found in-memory writebatch, so triggers a db get
+        db.get("c") // key doesn't exists - triggers db get
+        assert(iterator(db).toSet === Set(("a", "1"), ("b", "2")))
+        db.commit()
+        verifyMetrics(putCount = 1, getCount = 3, iterCountPositive = true, 
db.metrics)
+      }
+    }
+
+    // disable resetting stats
+    withTempDirectory { dir =>
+      val remoteDir = dir.getCanonicalPath
+      withDB(remoteDir, conf = RocksDBConf().copy(resetStatsOnLoad = false)) { 
db =>
+        verifyMetrics(putCount = 0, getCount = 0, metrics = db.metrics)
+        db.load(0)
+        db.put("a", "1") // put also triggers a db get
+        db.commit()
+        // put and get counts are cumulative
+        verifyMetrics(putCount = 1, getCount = 1, metrics = db.metrics)
+
+        db.load(1)
+        db.put("b", "2") // put also triggers a db get
+        db.get("a")
+        db.commit()
+        // put and get counts are cumulative: existing get=1, put=1: new 
get=2, put=1
+        verifyMetrics(putCount = 2, getCount = 3, metrics = db.metrics)
+      }
+    }
+
+    // force compaction and check the compaction metrics
+    withTempDirectory { dir =>
+      val remoteDir = dir.getCanonicalPath
+      withDB(remoteDir, conf = RocksDBConf().copy(compactOnCommit = true)) { 
db =>
+        db.load(0)
+        db.put("a", "5")
+        db.put("b", "5")
+        db.commit()
+
+        db.load(1)
+        db.put("a", "10")
+        db.put("b", "25")
+        db.commit()
+
+        val metrics = db.metrics
+        assert(metrics.nativeOpsHistograms("compaction").count > 0)
+        assert(metrics.nativeOpsMetrics("totalBytesReadByCompaction") > 0)
+        assert(metrics.nativeOpsMetrics("totalBytesWrittenByCompaction") > 0)
+      }
+    }
+  }
+
+  private def withTempDirectory(f: File => Unit): Unit = {
+    val dir = Utils.createTempDir()
+    try f(dir) finally {
+      Utils.deleteRecursively(dir)
+    }
+  }

Review comment:
       Why we need this? Can't we reuse withTempDir?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to