micheal-o commented on code in PR #54567:
URL: https://github.com/apache/spark/pull/54567#discussion_r2898579572
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -601,12 +601,30 @@ class RocksDB(
private def loadEmptyStore(version: Long): Unit = {
// Use version 0 logic to create empty directory with no SST files
- val metadata = fileManager.loadCheckpointFromDfs(0, workingDir,
rocksDBFileMapping, None)
+ val metadata = fetchCheckpointFromDfs(0)
+ // No real snapshot exists at this version; advance loadedVersion to the
target
+ // so the next commit produces version + 1 rather than 1.
Review Comment:
please lets remove this comment. It isn't correct
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -601,12 +601,30 @@ class RocksDB(
private def loadEmptyStore(version: Long): Unit = {
// Use version 0 logic to create empty directory with no SST files
- val metadata = fileManager.loadCheckpointFromDfs(0, workingDir,
rocksDBFileMapping, None)
+ val metadata = fetchCheckpointFromDfs(0)
+ // No real snapshot exists at this version; advance loadedVersion to the
target
+ // so the next commit produces version + 1 rather than 1.
loadedVersion = version
fileManager.setMaxSeenVersion(version)
openLocalRocksDB(metadata)
}
+ /**
+ * Fetches a snapshot from DFS, sets [[loadedVersion]] to the snapshot
version,
+ * and increments [[numCloudLoads]]. Returns the checkpoint metadata.
+ * Callers are responsible for calling [[openLocalRocksDB]], setting
+ * [[lastSnapshotVersion]], and any load-path-specific [[loadedVersion]]
overrides.
+ */
+ private def fetchCheckpointFromDfs(
Review Comment:
`loadCheckpointFromDfs`
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -601,12 +601,30 @@ class RocksDB(
private def loadEmptyStore(version: Long): Unit = {
// Use version 0 logic to create empty directory with no SST files
- val metadata = fileManager.loadCheckpointFromDfs(0, workingDir,
rocksDBFileMapping, None)
+ val metadata = fetchCheckpointFromDfs(0)
+ // No real snapshot exists at this version; advance loadedVersion to the
target
+ // so the next commit produces version + 1 rather than 1.
loadedVersion = version
fileManager.setMaxSeenVersion(version)
openLocalRocksDB(metadata)
}
+ /**
+ * Fetches a snapshot from DFS, sets [[loadedVersion]] to the snapshot
version,
+ * and increments [[numCloudLoads]]. Returns the checkpoint metadata.
+ * Callers are responsible for calling [[openLocalRocksDB]], setting
+ * [[lastSnapshotVersion]], and any load-path-specific [[loadedVersion]]
overrides.
+ */
+ private def fetchCheckpointFromDfs(
+ snapshotVersion: Long,
+ uniqueId: Option[String] = None): RocksDBCheckpointMetadata = {
+ numCloudLoads += 1
+ val metadata = fileManager.loadCheckpointFromDfs(
+ snapshotVersion, workingDir, rocksDBFileMapping, uniqueId)
+ loadedVersion = snapshotVersion
Review Comment:
Honestly I wouldn't add this extra function, makes the side effect even more
difficult to reason about from the calling function. Just set the load flag
that you are introducing at the source. It is a one-line change.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -680,14 +698,12 @@ class RocksDB(
override protected def beforeLoad(): Unit = closeDB(ignoreException =
false)
override protected def loadSnapshotFromCheckpoint(snapshotVersion:
Long): Unit = {
- val remoteMetaData = fileManager.loadCheckpointFromDfs(snapshotVersion,
- workingDir, rocksDBFileMapping)
+ val metadata = fetchCheckpointFromDfs(snapshotVersion)
Review Comment:
This is why your test is having > 1 count for when autorepair is done. Even
if autorepair is done, we just care whether a store was loaded from DFS or not.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -680,14 +698,12 @@ class RocksDB(
override protected def beforeLoad(): Unit = closeDB(ignoreException =
false)
override protected def loadSnapshotFromCheckpoint(snapshotVersion:
Long): Unit = {
- val remoteMetaData = fileManager.loadCheckpointFromDfs(snapshotVersion,
- workingDir, rocksDBFileMapping)
+ val metadata = fetchCheckpointFromDfs(snapshotVersion)
- loadedVersion = snapshotVersion
// Initialize maxVersion upon successful load from DFS
fileManager.setMaxSeenVersion(snapshotVersion)
- openLocalRocksDB(remoteMetaData)
+ openLocalRocksDB(metadata)
Review Comment:
Lets avoid these unnecessary variable renames. It just adds unnecessary
extra lines to the PR
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreIntegrationSuite.scala:
##########
@@ -114,7 +114,8 @@ class RocksDBStateStoreIntegrationSuite extends StreamTest
"SnapshotLastUploaded.partition_0_default",
"rocksdbChangeLogWriterCommitLatencyMs",
"rocksdbSaveZipFilesLatencyMs",
"rocksdbLoadFromSnapshotLatencyMs",
"rocksdbLoadLatencyMs", "rocksdbReplayChangeLogLatencyMs",
- "rocksdbNumReplayChangelogFiles", "rocksdbForceSnapshotCount"))
+ "rocksdbNumReplayChangelogFiles", "rocksdbForceSnapshotCount",
+ "rocksdbLoadedFromCloud"))
assert(stateOperatorMetrics.customMetrics.get("rocksdbNumSnapshotsAutoRepaired")
== 0,
Review Comment:
lets verify the metric value too
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala:
##########
@@ -1367,6 +1368,10 @@ object RocksDBStateStoreProvider {
"rocksdbNumReplayChangelogFiles",
"RocksDB: load - number of change log files replayed")
+ val CUSTOM_METRIC_LOADED_FROM_CLOUD = StateStoreCustomSumMetric(
Review Comment:
ditto
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -601,12 +601,30 @@ class RocksDB(
private def loadEmptyStore(version: Long): Unit = {
// Use version 0 logic to create empty directory with no SST files
- val metadata = fileManager.loadCheckpointFromDfs(0, workingDir,
rocksDBFileMapping, None)
+ val metadata = fetchCheckpointFromDfs(0)
+ // No real snapshot exists at this version; advance loadedVersion to the
target
+ // so the next commit produces version + 1 rather than 1.
loadedVersion = version
fileManager.setMaxSeenVersion(version)
openLocalRocksDB(metadata)
}
+ /**
+ * Fetches a snapshot from DFS, sets [[loadedVersion]] to the snapshot
version,
+ * and increments [[numCloudLoads]]. Returns the checkpoint metadata.
+ * Callers are responsible for calling [[openLocalRocksDB]], setting
+ * [[lastSnapshotVersion]], and any load-path-specific [[loadedVersion]]
overrides.
Review Comment:
please lets remove these 2 lines of comment. It is not essential for this
func
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala:
##########
@@ -1367,6 +1368,10 @@ object RocksDBStateStoreProvider {
"rocksdbNumReplayChangelogFiles",
"RocksDB: load - number of change log files replayed")
+ val CUSTOM_METRIC_LOADED_FROM_CLOUD = StateStoreCustomSumMetric(
+ "rocksdbLoadedFromCloud",
Review Comment:
ditto, fix name
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -244,6 +244,9 @@ class RocksDB(
// Was snapshot auto repair performed when loading the current version
@volatile private var performedSnapshotAutoRepair = false
+ // Number of DFS (cloud) fetches performed when loading the current version
+ @volatile private var numCloudLoads = 0L
Review Comment:
This should never be greater than 1 for a given store instance. Hence can be
a boolean. For a store if loaded from DFS then should be 1, if not, 0. Just
like the `performedSnapshotAutoRepair` above.
Also lets avoid referring to this as cloud, you would see that existing code
calls it DFS. Because spark can run using storage that isn't cloud.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -831,6 +849,7 @@ class RocksDB(
assert(snapshotVersionStateStoreCkptId.isDefined ==
endVersionStateStoreCkptId.isDefined)
assert(snapshotVersion >= 0 && endVersion >= snapshotVersion)
recordedMetrics = None
+ numCloudLoads = 0L
Review Comment:
this function would always load from DFS. Basically it is in the name, "load
from a snapshot in the DFS"
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala:
##########
Review Comment:
should be 0 for this
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala:
##########
@@ -4073,6 +4075,8 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures
with SharedSparkSession
db.put("e", "4")
db.commit() // a new snapshot (5.zip) will be created since
previous one is corrupt
assert(db.metricsOpt.get.numSnapshotsAutoRepaired == 1)
+ // 4.zip was tried and failed (1 load), then 2.zip succeeded (2
loads)
+ assert(db.metricsOpt.get.loadMetrics("numCloudLoads") === 2)
Review Comment:
incorrect
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala:
##########
@@ -4085,6 +4089,8 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures
with SharedSparkSession
assert(toStr(db.get("b")) == "1")
db.commit()
assert(db.metricsOpt.get.numSnapshotsAutoRepaired == 1)
+ // 5.zip failed (1), 4.zip failed (2), 2.zip failed (3), then
version 0 succeeded (4)
+ assert(db.metricsOpt.get.loadMetrics("numCloudLoads") === 4)
Review Comment:
incorrect
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala:
##########
@@ -1367,6 +1368,10 @@ object RocksDBStateStoreProvider {
"rocksdbNumReplayChangelogFiles",
"RocksDB: load - number of change log files replayed")
+ val CUSTOM_METRIC_LOADED_FROM_CLOUD = StateStoreCustomSumMetric(
+ "rocksdbLoadedFromCloud",
+ "RocksDB: load - number of times state was loaded from cloud storage")
Review Comment:
ditto, lets avoid using Cloud. use: `from external storage`
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala:
##########
@@ -2920,6 +2920,7 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures
with SharedSparkSession
val m1 = db.metricsOpt.get
assert(m1.loadMetrics("load") > 0)
+ assert(m1.loadMetrics("numCloudLoads") === 1)
Review Comment:
i don't see any other test where you are also verifying load is 0
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]