siying commented on code in PR #48355:
URL: https://github.com/apache/spark/pull/48355#discussion_r1837426396
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala:
##########
@@ -204,6 +210,9 @@ class RocksDBFileManager(
case _ =>
throw
QueryExecutionErrors.invalidChangeLogReaderVersion(changelogVersion)
}
+ if (checkpointUniqueId.isDefined) {
+ changelogReader.lineage
Review Comment:
The way it is written is confusing. Can you improve?
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -1036,6 +1184,39 @@ class RocksDB(
Option(acquiredThreadInfo).map(_.copy())
}
+ /** Upload the snapshot to DFS and remove it from snapshots pending */
+ private def uploadSnapshot(
+ snapshot: RocksDB#RocksDBSnapshot,
+ fileManager: RocksDBFileManager,
+ snapshotsPendingUpload: Set[RocksDBVersionSnapshotInfo],
+ loggingId: String): RocksDBFileManagerMetrics = {
Review Comment:
Can you explain why the function needs to be moved?
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala:
##########
@@ -268,28 +280,30 @@ class RocksDBFileManager(
def loadCheckpointFromDfs(
version: Long,
localDir: File,
- rocksDBFileMapping: RocksDBFileMapping): RocksDBCheckpointMetadata = {
- logInfo(log"Loading checkpoint files for version
${MDC(LogKeys.VERSION_NUM, version)}")
+ rocksDBFileMapping: RocksDBFileMapping,
+ checkpointUniqueId: Option[String] = None): RocksDBCheckpointMetadata = {
+ logInfo(log"Loading checkpoint files for version
${MDC(LogKeys.VERSION_NUM, version)} " +
+ log"checkpointUniqueId: ${MDC(LogKeys.UUID,
checkpointUniqueId.getOrElse(""))}")
// The unique ids of SST files are checked when opening a rocksdb
instance. The SST files
// in larger versions can't be reused even if they have the same size and
name because
// they belong to another rocksdb instance.
- versionToRocksDBFiles.keySet().removeIf(_ >= version)
+ versionToRocksDBFiles.keySet().removeIf(_._1 >= version)
val metadata = if (version == 0) {
if (localDir.exists) Utils.deleteRecursively(localDir)
localDir.mkdirs()
RocksDBCheckpointMetadata(Seq.empty, 0)
} else {
// Delete all non-immutable files in local dir, and unzip new ones from
DFS commit file
listRocksDBFiles(localDir)._2.foreach(_.delete())
- Utils.unzipFilesFromFile(fs, dfsBatchZipFile(version), localDir)
+ Utils.unzipFilesFromFile(fs, dfsBatchZipFile(version,
checkpointUniqueId), localDir)
// Copy the necessary immutable files
val metadataFile = localMetadataFile(localDir)
val metadata = RocksDBCheckpointMetadata.readFromFile(metadataFile)
logInfo(log"Read metadata for version ${MDC(LogKeys.VERSION_NUM,
version)}:\n" +
log"${MDC(LogKeys.METADATA_JSON, metadata.prettyJson)}")
loadImmutableFilesFromDfs(metadata.immutableFiles, localDir,
rocksDBFileMapping, version)
- versionToRocksDBFiles.put(version, metadata.immutableFiles)
+ versionToRocksDBFiles.put((version, checkpointUniqueId),
metadata.immutableFiles)
Review Comment:
Since you touched this `versionToRocksDBFiles`, it is tricky. Can you ask
Micheal to take a look too?
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala:
##########
@@ -322,14 +349,18 @@ class RocksDBFileManager(
val path = new Path(dfsRootDir)
if (fm.exists(path)) {
val files = fm.list(path).map(_.getPath)
- val changelogFileVersions = files
- .filter(onlyChangelogFiles.accept)
- .map(_.getName.stripSuffix(".changelog"))
- .map(_.toLong)
- val snapshotFileVersions = files
- .filter(onlyZipFiles.accept)
- .map(_.getName.stripSuffix(".zip"))
- .map(_.toLong)
+ val changelogFileVersions = files.filter(onlyChangelogFiles.accept)
+ .map(_.getName.stripSuffix(".changelog").split("_"))
+ .map {
+ case Array(version, _) => version.toLong
+ case Array(version) => version.toLong
+ }
+ val snapshotFileVersions = files.filter(onlyZipFiles.accept)
+ .map(_.getName.stripSuffix(".zip").split("_"))
+ .map {
+ case Array(version, _) => version.toLong
+ case Array(version) => version.toLong
+ }
Review Comment:
Btw, this is an expensive call and I wonder what we use it for. I hope it is
only used in unit test or outside state store.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala:
##########
@@ -299,20 +313,33 @@ class RocksDBFileManager(
}
// Get latest snapshot version <= version
- def getLatestSnapshotVersion(version: Long): Long = {
+ def getLatestSnapshotVersionAndUniqueId(
+ version: Long, checkpointUniqueId: Option[String] = None): Array[(Long,
Option[String])] = {
val path = new Path(dfsRootDir)
if (fm.exists(path)) {
// If the latest version snapshot exists, we avoid listing.
- if (fm.exists(dfsBatchZipFile(version))) {
- return version
+ if (fm.exists(dfsBatchZipFile(version, checkpointUniqueId))) {
+ return Array((version, checkpointUniqueId))
+ } else if (fm.exists(dfsBatchZipFile(version))) {
+ // This is possible when the state was previously ran under checkpoint
format v1
+ // and restarted with v2. Then even if there is checkpointUniqueId
passed in, the file
Review Comment:
If previously we ran v1, there won't be `checkpointUniqueId` available. So
if `checkpointUniqueId` is available, there should not be a v1 path.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala:
##########
@@ -299,20 +313,33 @@ class RocksDBFileManager(
}
// Get latest snapshot version <= version
- def getLatestSnapshotVersion(version: Long): Long = {
+ def getLatestSnapshotVersionAndUniqueId(
+ version: Long, checkpointUniqueId: Option[String] = None): Array[(Long,
Option[String])] = {
val path = new Path(dfsRootDir)
if (fm.exists(path)) {
// If the latest version snapshot exists, we avoid listing.
- if (fm.exists(dfsBatchZipFile(version))) {
- return version
+ if (fm.exists(dfsBatchZipFile(version, checkpointUniqueId))) {
+ return Array((version, checkpointUniqueId))
+ } else if (fm.exists(dfsBatchZipFile(version))) {
+ // This is possible when the state was previously ran under checkpoint
format v1
+ // and restarted with v2. Then even if there is checkpointUniqueId
passed in, the file
+ // does not have that uniqueId in the filename.
+ return Array((version, None))
}
- fm.list(path, onlyZipFiles)
- .map(_.getPath.getName.stripSuffix(".zip"))
- .map(_.toLong)
- .filter(_ <= version)
- .foldLeft(0L)(math.max)
+ val versionAndUniqueIds = fm.list(path, onlyZipFiles)
+ .map(_.getPath.getName.stripSuffix(".zip").split("_"))
+ .filter {
+ case Array(ver, _) => ver.toLong <= version
+ case Array(ver) => ver.toLong <= version
+ }
+ .map {
+ case Array(version, uniqueId) => (version.toLong, Option(uniqueId))
+ case Array(version) => (version.toLong, None)
+ }
+ val maxVersion = versionAndUniqueIds.map(_._1).foldLeft(0L)(math.max)
+ versionAndUniqueIds.filter(_._1 == maxVersion)
Review Comment:
As mentioned in another comment, if we can't find a snapshot with specific
checkpointID, we should ignore snapshoting for this version.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]