WweiL commented on code in PR #48355:
URL: https://github.com/apache/spark/pull/48355#discussion_r1881124818


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -1036,6 +1215,43 @@ class RocksDB(
     Option(acquiredThreadInfo).map(_.copy())
   }
 
+  /** Upload the snapshot to DFS and remove it from snapshots pending */
+  private def uploadSnapshot(
+    snapshot: RocksDB#RocksDBSnapshot,
+    fileManager: RocksDBFileManager,
+    snapshotsPendingUpload: Set[RocksDBVersionSnapshotInfo],
+    loggingId: String): RocksDBFileManagerMetrics = {
+    var fileManagerMetrics: RocksDBFileManagerMetrics = null
+    try {
+      val uploadTime = timeTakenMs {
+        fileManager.saveCheckpointToDfs(snapshot.checkpointDir,
+          snapshot.version, snapshot.numKeys, snapshot.fileMapping,
+          Some(snapshot.columnFamilyMapping), 
Some(snapshot.maxColumnFamilyId), snapshot.uniqueId)
+        fileManagerMetrics = fileManager.latestSaveCheckpointMetrics
+
+        val snapshotInfo = RocksDBVersionSnapshotInfo(snapshot.version, 
snapshot.dfsFileSuffix)
+        // We are only removing the uploaded snapshot info from the pending 
set,
+        // to let the file mapping (i.e. query threads) know that the snapshot 
(i.e. and its files)
+        // have been uploaded to DFS. We don't touch the file mapping here to 
avoid corrupting it.
+        snapshotsPendingUpload.remove(snapshotInfo)
+      }
+      // This is relative aggressive because that even if the uploading 
succeeds,
+      // it is not necessarily the one written to the commit log. But we can 
always load lineage
+      // from commit log so it is fine.
+      lineageManager.resetLineage(lineageManager.getLineageForCurrVersion()
+        .filter(i => i.version >= snapshot.version))

Review Comment:
   It is possible that there are race conditions, but it is fine. Because we 
only require the first version in the lineage to be a snapshot version but not 
necessarily the latest snapshot version. So this could happen:
   
   1. in `loadWithCheckpointId()`: lineage = 
lineageManager.getLineageForCurrVersion()
   2. uploadSnapshot() // lineageManager.resetLineage
   3. lineage wrote to changelog file. 
   
   Now the lineage written to the changelog file actually contains two snapshot 
versions. But this is fine because later when we really need to 
`getLatestSnapshotVersionAndUniqueIdFromLineage`, we list all snapshot files 
and find the latest one based on the lineage stored in the change log file. So 
even if it has two snapshot versions we can still find the last one



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to