micheal-o commented on code in PR #47875:
URL: https://github.com/apache/spark/pull/47875#discussion_r1803892694


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -1007,10 +997,147 @@ class RocksDB(
     }
   }
 
+  override protected def logName: String = s"${super.logName} $loggingId"
+}
+
+object RocksDB extends Logging {
+
+  /** Upload the snapshot to DFS and remove it from snapshots pending */
+  private def uploadSnapshot(
+      snapshot: RocksDB#RocksDBSnapshot,
+      fileManager: RocksDBFileManager,
+      snapshotsPendingUpload: Set[RocksDBVersionSnapshotInfo],
+      loggingId: String): RocksDBFileManagerMetrics = {
+    var fileManagerMetrics: RocksDBFileManagerMetrics = null
+    try {
+      val uploadTime = timeTakenMs {
+        fileManager.saveCheckpointToDfs(snapshot.checkpointDir,
+          snapshot.version, snapshot.numKeys, snapshot.fileMapping,
+          Some(snapshot.columnFamilyMapping), Some(snapshot.maxColumnFamilyId))
+        fileManagerMetrics = fileManager.latestSaveCheckpointMetrics
+
+        val snapshotInfo = RocksDBVersionSnapshotInfo(snapshot.version, 
snapshot.dfsFileSuffix)
+        // We are only removing the uploaded snapshot info from the pending 
set,
+        // to let the file mapping (i.e. query threads) know that the snapshot 
(i.e. and its files)
+        // have been uploaded to DFS. We don't touch the file mapping here to 
avoid corrupting it.
+        snapshotsPendingUpload.remove(snapshotInfo)
+      }
+      logInfo(log"${MDC(LogKeys.LOG_ID, loggingId)}: Upload snapshot of 
version " +
+        log"${MDC(LogKeys.VERSION_NUM, snapshot.version)}," +
+        log" time taken: ${MDC(LogKeys.TIME_UNITS, uploadTime)} ms")
+    } finally {
+      snapshot.close()
+    }
+
+    fileManagerMetrics
+  }
+
   /** Records the duration of running `body` for the next query progress 
update. */
-  protected def timeTakenMs(body: => Unit): Long = Utils.timeTakenMs(body)._2
+  private def timeTakenMs(body: => Unit): Long = Utils.timeTakenMs(body)._2
+}
 
-  override protected def logName: String = s"${super.logName} $loggingId"
+// uniquely identifies a Snapshot. Multiple snapshots created for same version 
will
+// use a different dfsFilesUUID, and hence will have different 
RocksDBVersionSnapshotInfo
+case class RocksDBVersionSnapshotInfo(version: Long, dfsFilesUUID: String)
+
+// Encapsulates a RocksDB immutable file, and the information whether it has 
been previously
+// uploaded to DFS. Already uploaded files can be skipped during SST file 
upload.
+case class RocksDBSnapshotFile(immutableFile: RocksDBImmutableFile, 
isUploaded: Boolean)
+
+// Encapsulates the mapping of local SST files to DFS files. This mapping 
prevents
+// re-uploading the same SST file multiple times to DFS, saving I/O and 
reducing snapshot
+// upload time. During version load, if a DFS file is already present on local 
file system,
+// it will be reused.
+// This mapping should only be updated using the Task thread - at version load 
and commit time.
+// If same mapping instance is updated from different threads, it will result 
in undefined behavior
+// (and most likely incorrect mapping state).
+class RocksDBFileMapping {
+
+  // Maps a local SST file to the DFS version and DFS file.
+  private val localFileMappings: mutable.Map[String, (Long, 
RocksDBImmutableFile)] =
+    mutable.HashMap[String, (Long, RocksDBImmutableFile)]()
+
+  // Keeps track of all snapshots which have not been uploaded yet. This 
prevents Spark
+  // from reusing SST files which have not been yet persisted to DFS,
+  val snapshotsPendingUpload: Set[RocksDBVersionSnapshotInfo] = 
ConcurrentHashMap.newKeySet()
+
+  // Current State Store version which has been loaded.
+  var currentVersion: Long = 0
+
+  // If the local file (with localFileName) has already been persisted to DFS, 
returns the
+  // DFS file, else returns None.
+  // If the currently mapped DFS file was committed in a newer version (or was 
generated
+  // in a version which has not been uploaded to DFS yet), the mapped DFS file 
is ignored (because
+  // it cannot be reused in this version). In this scenario, the local mapping 
to this DFS file
+  // will be cleared, and function will return None.
+  def getDfsFile(
+      fileManager: RocksDBFileManager,
+      localFileName: String): Option[RocksDBImmutableFile] = {
+    localFileMappings.get(localFileName).map { case (dfsFileCommitVersion, 
dfsFile) =>
+      val dfsFileSuffix = fileManager.dfsFileSuffix(dfsFile)
+      val versionSnapshotInfo = 
RocksDBVersionSnapshotInfo(dfsFileCommitVersion, dfsFileSuffix)
+      if (dfsFileCommitVersion >= currentVersion ||
+        snapshotsPendingUpload.contains(versionSnapshotInfo)) {
+        // the mapped dfs file cannot be used, delete from mapping
+        localFileMappings.remove(localFileName)

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to