micheal-o commented on code in PR #54567:
URL: https://github.com/apache/spark/pull/54567#discussion_r2898579572


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -601,12 +601,30 @@ class RocksDB(
 
   private def loadEmptyStore(version: Long): Unit = {
     // Use version 0 logic to create empty directory with no SST files
-    val metadata = fileManager.loadCheckpointFromDfs(0, workingDir, 
rocksDBFileMapping, None)
+    val metadata = fetchCheckpointFromDfs(0)
+    // No real snapshot exists at this version; advance loadedVersion to the 
target
+    // so the next commit produces version + 1 rather than 1.

Review Comment:
   please lets remove this comment. It isn't correct



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -601,12 +601,30 @@ class RocksDB(
 
   private def loadEmptyStore(version: Long): Unit = {
     // Use version 0 logic to create empty directory with no SST files
-    val metadata = fileManager.loadCheckpointFromDfs(0, workingDir, 
rocksDBFileMapping, None)
+    val metadata = fetchCheckpointFromDfs(0)
+    // No real snapshot exists at this version; advance loadedVersion to the 
target
+    // so the next commit produces version + 1 rather than 1.
     loadedVersion = version
     fileManager.setMaxSeenVersion(version)
     openLocalRocksDB(metadata)
   }
 
+  /**
+   * Fetches a snapshot from DFS, sets [[loadedVersion]] to the snapshot 
version,
+   * and increments [[numCloudLoads]]. Returns the checkpoint metadata.
+   * Callers are responsible for calling [[openLocalRocksDB]], setting
+   * [[lastSnapshotVersion]], and any load-path-specific [[loadedVersion]] 
overrides.
+   */
+  private def fetchCheckpointFromDfs(

Review Comment:
   `loadCheckpointFromDfs`



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -601,12 +601,30 @@ class RocksDB(
 
   private def loadEmptyStore(version: Long): Unit = {
     // Use version 0 logic to create empty directory with no SST files
-    val metadata = fileManager.loadCheckpointFromDfs(0, workingDir, 
rocksDBFileMapping, None)
+    val metadata = fetchCheckpointFromDfs(0)
+    // No real snapshot exists at this version; advance loadedVersion to the 
target
+    // so the next commit produces version + 1 rather than 1.
     loadedVersion = version
     fileManager.setMaxSeenVersion(version)
     openLocalRocksDB(metadata)
   }
 
+  /**
+   * Fetches a snapshot from DFS, sets [[loadedVersion]] to the snapshot 
version,
+   * and increments [[numCloudLoads]]. Returns the checkpoint metadata.
+   * Callers are responsible for calling [[openLocalRocksDB]], setting
+   * [[lastSnapshotVersion]], and any load-path-specific [[loadedVersion]] 
overrides.
+   */
+  private def fetchCheckpointFromDfs(
+      snapshotVersion: Long,
+      uniqueId: Option[String] = None): RocksDBCheckpointMetadata = {
+    numCloudLoads += 1
+    val metadata = fileManager.loadCheckpointFromDfs(
+      snapshotVersion, workingDir, rocksDBFileMapping, uniqueId)
+    loadedVersion = snapshotVersion

Review Comment:
   Honestly I wouldn't add this extra function, makes the side effect even more 
difficult to reason about from the calling function. Just set the load flag 
that you are introducing at the source. It is a one-line change.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -680,14 +698,12 @@ class RocksDB(
       override protected def beforeLoad(): Unit = closeDB(ignoreException = 
false)
 
       override protected def loadSnapshotFromCheckpoint(snapshotVersion: 
Long): Unit = {
-        val remoteMetaData = fileManager.loadCheckpointFromDfs(snapshotVersion,
-          workingDir, rocksDBFileMapping)
+        val metadata = fetchCheckpointFromDfs(snapshotVersion)

Review Comment:
   This is why your test is having > 1 count for when autorepair is done. Even 
if autorepair is done, we just care whether a store was loaded from DFS or not.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -680,14 +698,12 @@ class RocksDB(
       override protected def beforeLoad(): Unit = closeDB(ignoreException = 
false)
 
       override protected def loadSnapshotFromCheckpoint(snapshotVersion: 
Long): Unit = {
-        val remoteMetaData = fileManager.loadCheckpointFromDfs(snapshotVersion,
-          workingDir, rocksDBFileMapping)
+        val metadata = fetchCheckpointFromDfs(snapshotVersion)
 
-        loadedVersion = snapshotVersion
         // Initialize maxVersion upon successful load from DFS
         fileManager.setMaxSeenVersion(snapshotVersion)
 
-        openLocalRocksDB(remoteMetaData)
+        openLocalRocksDB(metadata)

Review Comment:
   Lets avoid these unnecessary variable renames. It just adds unnecessary 
extra lines to the PR



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreIntegrationSuite.scala:
##########
@@ -114,7 +114,8 @@ class RocksDBStateStoreIntegrationSuite extends StreamTest
               "SnapshotLastUploaded.partition_0_default", 
"rocksdbChangeLogWriterCommitLatencyMs",
               "rocksdbSaveZipFilesLatencyMs", 
"rocksdbLoadFromSnapshotLatencyMs",
               "rocksdbLoadLatencyMs", "rocksdbReplayChangeLogLatencyMs",
-              "rocksdbNumReplayChangelogFiles", "rocksdbForceSnapshotCount"))
+              "rocksdbNumReplayChangelogFiles", "rocksdbForceSnapshotCount",
+              "rocksdbLoadedFromCloud"))
             
assert(stateOperatorMetrics.customMetrics.get("rocksdbNumSnapshotsAutoRepaired")
 == 0,

Review Comment:
   lets verify the metric value too



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala:
##########
@@ -1367,6 +1368,10 @@ object RocksDBStateStoreProvider {
     "rocksdbNumReplayChangelogFiles",
     "RocksDB: load - number of change log files replayed")
 
+  val CUSTOM_METRIC_LOADED_FROM_CLOUD = StateStoreCustomSumMetric(

Review Comment:
   ditto



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -601,12 +601,30 @@ class RocksDB(
 
   private def loadEmptyStore(version: Long): Unit = {
     // Use version 0 logic to create empty directory with no SST files
-    val metadata = fileManager.loadCheckpointFromDfs(0, workingDir, 
rocksDBFileMapping, None)
+    val metadata = fetchCheckpointFromDfs(0)
+    // No real snapshot exists at this version; advance loadedVersion to the 
target
+    // so the next commit produces version + 1 rather than 1.
     loadedVersion = version
     fileManager.setMaxSeenVersion(version)
     openLocalRocksDB(metadata)
   }
 
+  /**
+   * Fetches a snapshot from DFS, sets [[loadedVersion]] to the snapshot 
version,
+   * and increments [[numCloudLoads]]. Returns the checkpoint metadata.
+   * Callers are responsible for calling [[openLocalRocksDB]], setting
+   * [[lastSnapshotVersion]], and any load-path-specific [[loadedVersion]] 
overrides.

Review Comment:
   please lets remove these 2 lines of comment. It is not essential for this 
func



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala:
##########
@@ -1367,6 +1368,10 @@ object RocksDBStateStoreProvider {
     "rocksdbNumReplayChangelogFiles",
     "RocksDB: load - number of change log files replayed")
 
+  val CUSTOM_METRIC_LOADED_FROM_CLOUD = StateStoreCustomSumMetric(
+    "rocksdbLoadedFromCloud",

Review Comment:
   ditto, fix name



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -244,6 +244,9 @@ class RocksDB(
   // Was snapshot auto repair performed when loading the current version
   @volatile private var performedSnapshotAutoRepair = false
 
+  // Number of DFS (cloud) fetches performed when loading the current version
+  @volatile private var numCloudLoads = 0L

Review Comment:
   This should never be greater than 1 for a given store instance. Hence can be 
a boolean. For a store if loaded from DFS then should be 1, if not, 0. Just 
like the `performedSnapshotAutoRepair` above.
   
   Also lets avoid referring to this as cloud, you would see that existing code 
calls it DFS. Because spark can run using storage that isn't cloud.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -831,6 +849,7 @@ class RocksDB(
     assert(snapshotVersionStateStoreCkptId.isDefined == 
endVersionStateStoreCkptId.isDefined)
     assert(snapshotVersion >= 0 && endVersion >= snapshotVersion)
     recordedMetrics = None
+    numCloudLoads = 0L

Review Comment:
   this function would always load from DFS. Basically it is in the name, "load 
from a snapshot in the DFS"



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala:
##########


Review Comment:
   should be 0 for this



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala:
##########
@@ -4073,6 +4075,8 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures 
with SharedSparkSession
             db.put("e", "4")
             db.commit() // a new snapshot (5.zip) will be created since 
previous one is corrupt
             assert(db.metricsOpt.get.numSnapshotsAutoRepaired == 1)
+            // 4.zip was tried and failed (1 load), then 2.zip succeeded (2 
loads)
+            assert(db.metricsOpt.get.loadMetrics("numCloudLoads") === 2)

Review Comment:
   incorrect



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala:
##########
@@ -4085,6 +4089,8 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures 
with SharedSparkSession
             assert(toStr(db.get("b")) == "1")
             db.commit()
             assert(db.metricsOpt.get.numSnapshotsAutoRepaired == 1)
+            // 5.zip failed (1), 4.zip failed (2), 2.zip failed (3), then 
version 0 succeeded (4)
+            assert(db.metricsOpt.get.loadMetrics("numCloudLoads") === 4)

Review Comment:
   incorrect



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala:
##########
@@ -1367,6 +1368,10 @@ object RocksDBStateStoreProvider {
     "rocksdbNumReplayChangelogFiles",
     "RocksDB: load - number of change log files replayed")
 
+  val CUSTOM_METRIC_LOADED_FROM_CLOUD = StateStoreCustomSumMetric(
+    "rocksdbLoadedFromCloud",
+    "RocksDB: load - number of times state was loaded from cloud storage")

Review Comment:
   ditto, lets avoid using Cloud. use: `from external storage`



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala:
##########
@@ -2920,6 +2920,7 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures 
with SharedSparkSession
 
         val m1 = db.metricsOpt.get
         assert(m1.loadMetrics("load") > 0)
+        assert(m1.loadMetrics("numCloudLoads") === 1)

Review Comment:
   i don't see any other test where you are also verifying load is 0



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to