micheal-o commented on code in PR #48685:
URL: https://github.com/apache/spark/pull/48685#discussion_r1826915259
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -1115,19 +1117,50 @@ class RocksDBFileMapping {
// Current State Store version which has been loaded.
var currentVersion: Long = 0
- // If the local file (with localFileName) has already been persisted to DFS,
returns the
- // DFS file, else returns None.
- // If the currently mapped DFS file was committed in a newer version (or was
generated
- // in a version which has not been uploaded to DFS yet), the mapped DFS file
is ignored (because
- // it cannot be reused in this version). In this scenario, the local mapping
to this DFS file
- // will be cleared, and function will return None.
- def getDfsFile(
+ /**
+ * Get the mapped DFS file for the given local file for a DFS load operation.
+ * If the currently mapped DFS file was mapped in the same or newer version
as current db version
+ * (or was generated in a version which has not been uploaded to DFS yet),
the mapped DFS file
+ * is ignored. In this scenario, the local mapping to this DFS file will be
cleared,
+ * and function will return None.
+ *
+ * @note For same version number, this is because we want to make sure we
are using
+ * the latest files in DFS, in case the previous zip file has been
overwritten in DFS.
+ *
+ * @return - Option with the DFS file or None
+ */
+ def getDfsFileForLoad(
+ fileManager: RocksDBFileManager,
+ localFileName: String): Option[RocksDBImmutableFile] = {
+ getDfsFileWithVersionCheck(fileManager, localFileName, _ >= currentVersion)
+ }
+
+ /**
+ * Get the mapped DFS file for the given local file for a DFS save (i.e.
checkpoint) operation.
+ * If the currently mapped DFS file was mapped in a newer version as current
db version
+ * (or was generated in a version which has not been uploaded to DFS yet),
the mapped DFS file
+ * is ignored. In this scenario, the local mapping to this DFS file will be
cleared,
+ * and function will return None.
+ *
+ * @note For same version number, we can reuse it. e.g. we load(v1) ->
save(v2),
+ * the loaded SST files from version 1 can be reused in version 2
upload.
Review Comment:
correct. See my other comment.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]