micheal-o commented on code in PR #47875:
URL: https://github.com/apache/spark/pull/47875#discussion_r1765796047
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -696,54 +682,22 @@ class RocksDB(
} else true
}
- private def uploadSnapshot(): Unit = {
- var oldSnapshotsImmutable: List[RocksDBSnapshot] = Nil
- val localCheckpoint = synchronized {
- val checkpoint = latestSnapshot
- latestSnapshot = None
-
- // Convert mutable list buffer to immutable to prevent
- // race condition with commit where old snapshot is added
- oldSnapshotsImmutable = oldSnapshots.toList
- oldSnapshots.clear()
-
- checkpoint
- }
- localCheckpoint match {
- case Some(
- RocksDBSnapshot(
- localDir,
- version,
- numKeys,
- capturedFileMappings,
- columnFamilyMapping,
- maxColumnFamilyId)) =>
- try {
- val uploadTime = timeTakenMs {
- fileManager.saveCheckpointToDfs(
- localDir,
- version,
- numKeys,
- capturedFileMappings,
- Some(columnFamilyMapping.toMap),
- Some(maxColumnFamilyId)
- )
- fileManagerMetrics = fileManager.latestSaveCheckpointMetrics
- }
- logInfo(log"${MDC(LogKeys.LOG_ID, loggingId)}: Upload snapshot of
version " +
- log"${MDC(LogKeys.VERSION_NUM, version)}," +
- log" time taken: ${MDC(LogKeys.TIME_UNITS, uploadTime)} ms")
- } finally {
- localCheckpoint.foreach(_.close())
-
- // Clean up old latestSnapshots
- for (snapshot <- oldSnapshotsImmutable) {
- snapshot.close()
- }
-
+ private def uploadSnapshot(snapshot: RocksDBSnapshot): Unit = {
+ try {
+ val uploadTime = timeTakenMs {
+ fileManager.saveCheckpointToDfs(snapshot.checkpointDir,
+ snapshot.version, snapshot.numKeys, snapshot.fileMapping,
+ Some(snapshot.columnFamilyMapping),
Some(snapshot.maxColumnFamilyId))
Review Comment:
I have added comment to explain that. We are only removing the uploaded
snapshot info from the pending set, to let the file mapping (i.e. query
threads) know that the snapshot (i.e. and its files) have been uploaded to DFS.
`saveCheckpointToDfs` is part of fileManager and also don't think we should
move out `uploadSnapshot` since other snapshot related functions are here too.
As well as the `doMaintenance` code is also here. So will be weird to have only
this in another place and don't want to over do the refactoring here for now.
The snapshot function is called by both query thread (on commit when changeLog
is off) and maintenance thread.
Hence I think the explanation I added here should be fine for now.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -696,54 +682,22 @@ class RocksDB(
} else true
}
- private def uploadSnapshot(): Unit = {
- var oldSnapshotsImmutable: List[RocksDBSnapshot] = Nil
- val localCheckpoint = synchronized {
- val checkpoint = latestSnapshot
- latestSnapshot = None
-
- // Convert mutable list buffer to immutable to prevent
- // race condition with commit where old snapshot is added
- oldSnapshotsImmutable = oldSnapshots.toList
- oldSnapshots.clear()
-
- checkpoint
- }
- localCheckpoint match {
- case Some(
- RocksDBSnapshot(
- localDir,
- version,
- numKeys,
- capturedFileMappings,
- columnFamilyMapping,
- maxColumnFamilyId)) =>
- try {
- val uploadTime = timeTakenMs {
- fileManager.saveCheckpointToDfs(
- localDir,
- version,
- numKeys,
- capturedFileMappings,
- Some(columnFamilyMapping.toMap),
- Some(maxColumnFamilyId)
- )
- fileManagerMetrics = fileManager.latestSaveCheckpointMetrics
- }
- logInfo(log"${MDC(LogKeys.LOG_ID, loggingId)}: Upload snapshot of
version " +
- log"${MDC(LogKeys.VERSION_NUM, version)}," +
- log" time taken: ${MDC(LogKeys.TIME_UNITS, uploadTime)} ms")
- } finally {
- localCheckpoint.foreach(_.close())
-
- // Clean up old latestSnapshots
- for (snapshot <- oldSnapshotsImmutable) {
- snapshot.close()
- }
-
+ private def uploadSnapshot(snapshot: RocksDBSnapshot): Unit = {
+ try {
+ val uploadTime = timeTakenMs {
+ fileManager.saveCheckpointToDfs(snapshot.checkpointDir,
+ snapshot.version, snapshot.numKeys, snapshot.fileMapping,
+ Some(snapshot.columnFamilyMapping),
Some(snapshot.maxColumnFamilyId))
Review Comment:
done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]