micheal-o commented on code in PR #47875:
URL: https://github.com/apache/spark/pull/47875#discussion_r1803892694
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -1007,10 +997,147 @@ class RocksDB(
}
}
+ override protected def logName: String = s"${super.logName} $loggingId"
+}
+
+object RocksDB extends Logging {
+
+ /** Upload the snapshot to DFS and remove it from snapshots pending */
+ private def uploadSnapshot(
+ snapshot: RocksDB#RocksDBSnapshot,
+ fileManager: RocksDBFileManager,
+ snapshotsPendingUpload: Set[RocksDBVersionSnapshotInfo],
+ loggingId: String): RocksDBFileManagerMetrics = {
+ var fileManagerMetrics: RocksDBFileManagerMetrics = null
+ try {
+ val uploadTime = timeTakenMs {
+ fileManager.saveCheckpointToDfs(snapshot.checkpointDir,
+ snapshot.version, snapshot.numKeys, snapshot.fileMapping,
+ Some(snapshot.columnFamilyMapping), Some(snapshot.maxColumnFamilyId))
+ fileManagerMetrics = fileManager.latestSaveCheckpointMetrics
+
+ val snapshotInfo = RocksDBVersionSnapshotInfo(snapshot.version,
snapshot.dfsFileSuffix)
+ // We are only removing the uploaded snapshot info from the pending
set,
+ // to let the file mapping (i.e. query threads) know that the snapshot
(i.e. and its files)
+ // have been uploaded to DFS. We don't touch the file mapping here to
avoid corrupting it.
+ snapshotsPendingUpload.remove(snapshotInfo)
+ }
+ logInfo(log"${MDC(LogKeys.LOG_ID, loggingId)}: Upload snapshot of
version " +
+ log"${MDC(LogKeys.VERSION_NUM, snapshot.version)}," +
+ log" time taken: ${MDC(LogKeys.TIME_UNITS, uploadTime)} ms")
+ } finally {
+ snapshot.close()
+ }
+
+ fileManagerMetrics
+ }
+
/** Records the duration of running `body` for the next query progress
update. */
- protected def timeTakenMs(body: => Unit): Long = Utils.timeTakenMs(body)._2
+ private def timeTakenMs(body: => Unit): Long = Utils.timeTakenMs(body)._2
+}
- override protected def logName: String = s"${super.logName} $loggingId"
+// uniquely identifies a Snapshot. Multiple snapshots created for same version
will
+// use a different dfsFilesUUID, and hence will have different
RocksDBVersionSnapshotInfo
+case class RocksDBVersionSnapshotInfo(version: Long, dfsFilesUUID: String)
+
+// Encapsulates a RocksDB immutable file, and the information whether it has
been previously
+// uploaded to DFS. Already uploaded files can be skipped during SST file
upload.
+case class RocksDBSnapshotFile(immutableFile: RocksDBImmutableFile,
isUploaded: Boolean)
+
+// Encapsulates the mapping of local SST files to DFS files. This mapping
prevents
+// re-uploading the same SST file multiple times to DFS, saving I/O and
reducing snapshot
+// upload time. During version load, if a DFS file is already present on local
file system,
+// it will be reused.
+// This mapping should only be updated using the Task thread - at version load
and commit time.
+// If same mapping instance is updated from different threads, it will result
in undefined behavior
+// (and most likely incorrect mapping state).
+class RocksDBFileMapping {
+
+ // Maps a local SST file to the DFS version and DFS file.
+ private val localFileMappings: mutable.Map[String, (Long,
RocksDBImmutableFile)] =
+ mutable.HashMap[String, (Long, RocksDBImmutableFile)]()
+
+ // Keeps track of all snapshots which have not been uploaded yet. This
prevents Spark
+ // from reusing SST files which have not been yet persisted to DFS,
+ val snapshotsPendingUpload: Set[RocksDBVersionSnapshotInfo] =
ConcurrentHashMap.newKeySet()
+
+ // Current State Store version which has been loaded.
+ var currentVersion: Long = 0
+
+ // If the local file (with localFileName) has already been persisted to DFS,
returns the
+ // DFS file, else returns None.
+ // If the currently mapped DFS file was committed in a newer version (or was
generated
+ // in a version which has not been uploaded to DFS yet), the mapped DFS file
is ignored (because
+ // it cannot be reused in this version). In this scenario, the local mapping
to this DFS file
+ // will be cleared, and function will return None.
+ def getDfsFile(
+ fileManager: RocksDBFileManager,
+ localFileName: String): Option[RocksDBImmutableFile] = {
+ localFileMappings.get(localFileName).map { case (dfsFileCommitVersion,
dfsFile) =>
+ val dfsFileSuffix = fileManager.dfsFileSuffix(dfsFile)
+ val versionSnapshotInfo =
RocksDBVersionSnapshotInfo(dfsFileCommitVersion, dfsFileSuffix)
+ if (dfsFileCommitVersion >= currentVersion ||
+ snapshotsPendingUpload.contains(versionSnapshotInfo)) {
+ // the mapped dfs file cannot be used, delete from mapping
+ localFileMappings.remove(localFileName)
Review Comment:
done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]