brkyvz commented on code in PR #48355:
URL: https://github.com/apache/spark/pull/48355#discussion_r1879408540


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala:
##########
@@ -151,7 +152,10 @@ class RocksDBFileManager(
   private var minSeenVersion = 1L
 
   @volatile private var rootDirChecked: Boolean = false
-  private val versionToRocksDBFiles = new ConcurrentHashMap[Long, 
Seq[RocksDBImmutableFile]]
+
+  // (version, checkpointUniqueId) -> immutable files
+  private val versionToRocksDBFiles =
+    new ConcurrentHashMap[(Long, Option[String]), Seq[RocksDBImmutableFile]]()

Review Comment:
   nit: I probably would define a UniqueVersion class or something that can be 
used across everything and maybe that replaces LineageItem too. Don't need to 
address, but something to think about



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -1036,6 +1215,43 @@ class RocksDB(
     Option(acquiredThreadInfo).map(_.copy())
   }
 
+  /** Upload the snapshot to DFS and remove it from snapshots pending */
+  private def uploadSnapshot(
+    snapshot: RocksDB#RocksDBSnapshot,
+    fileManager: RocksDBFileManager,
+    snapshotsPendingUpload: Set[RocksDBVersionSnapshotInfo],
+    loggingId: String): RocksDBFileManagerMetrics = {
+    var fileManagerMetrics: RocksDBFileManagerMetrics = null
+    try {
+      val uploadTime = timeTakenMs {
+        fileManager.saveCheckpointToDfs(snapshot.checkpointDir,
+          snapshot.version, snapshot.numKeys, snapshot.fileMapping,
+          Some(snapshot.columnFamilyMapping), 
Some(snapshot.maxColumnFamilyId), snapshot.uniqueId)
+        fileManagerMetrics = fileManager.latestSaveCheckpointMetrics
+
+        val snapshotInfo = RocksDBVersionSnapshotInfo(snapshot.version, 
snapshot.dfsFileSuffix)
+        // We are only removing the uploaded snapshot info from the pending 
set,
+        // to let the file mapping (i.e. query threads) know that the snapshot 
(i.e. and its files)
+        // have been uploaded to DFS. We don't touch the file mapping here to 
avoid corrupting it.
+        snapshotsPendingUpload.remove(snapshotInfo)
+      }
+      // This is relative aggressive because that even if the uploading 
succeeds,
+      // it is not necessarily the one written to the commit log. But we can 
always load lineage
+      // from commit log so it is fine.
+      lineageManager.resetLineage(lineageManager.getLineageForCurrVersion()
+        .filter(i => i.version >= snapshot.version))

Review Comment:
   if the upload of snapshots is async, could there be some race conditions 
happening here?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala:
##########
@@ -331,20 +352,52 @@ class RocksDBFileManager(
     }
   }
 
+  /**
+   * Based on the ground truth lineage loaded from changelog file (lineage), 
this function
+   * does file listing to find all snapshot (version, uniqueId) pairs, and 
finds
+   * the ground truth latest snapshot (version, uniqueId) the db instance 
needs to load.
+   *
+   * @param lineage The ground truth lineage loaded from changelog file, 
sorted by id
+   * @return The ground truth latest snapshot (version, uniqueId) the db 
instance needs to load,
+   *         when the return value is None it means ther is no such snapshot 
found.
+   */
+  def getLatestSnapshotVersionAndUniqueIdFromLineage(
+      lineage: Array[LineageItem]): Option[(Long, String)] = {
+    val path = new Path(dfsRootDir)
+    if (fm.exists(path)) {
+      fm.list(path, onlyZipFiles)
+        .map(_.getPath.getName.stripSuffix(".zip").split("_"))
+        .filter {
+          case Array(ver, id) => lineage.contains(LineageItem(ver.toLong, id))
+        }
+        .map {
+          case Array(version, uniqueId) => (version.toLong, uniqueId)
+        }
+        .sortBy(_._1)
+        .reverse
+        .headOption
+    } else {
+      None
+    }
+  }
 
   /** Get the latest version available in the DFS directory. If no data 
present, it returns 0. */
   def getLatestVersion(): Long = {
     val path = new Path(dfsRootDir)
     if (fm.exists(path)) {
       val files = fm.list(path).map(_.getPath)
-      val changelogFileVersions = files
-        .filter(onlyChangelogFiles.accept)
-        .map(_.getName.stripSuffix(".changelog"))
-        .map(_.toLong)
-      val snapshotFileVersions = files
-        .filter(onlyZipFiles.accept)
-        .map(_.getName.stripSuffix(".zip"))
-        .map(_.toLong)
+      val changelogFileVersions = files.filter(onlyChangelogFiles.accept)
+        .map(_.getName.stripSuffix(".changelog").split("_"))
+        .map {
+          case Array(version, _) => version.toLong
+          case Array(version) => version.toLong
+        }

Review Comment:
   try to combine these map functions to avoid creating intermediate garbage - 
it may harm GC performance for low latency queries



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala:
##########
@@ -331,20 +352,52 @@ class RocksDBFileManager(
     }
   }
 
+  /**
+   * Based on the ground truth lineage loaded from changelog file (lineage), 
this function
+   * does file listing to find all snapshot (version, uniqueId) pairs, and 
finds
+   * the ground truth latest snapshot (version, uniqueId) the db instance 
needs to load.
+   *
+   * @param lineage The ground truth lineage loaded from changelog file, 
sorted by id
+   * @return The ground truth latest snapshot (version, uniqueId) the db 
instance needs to load,
+   *         when the return value is None it means ther is no such snapshot 
found.
+   */
+  def getLatestSnapshotVersionAndUniqueIdFromLineage(
+      lineage: Array[LineageItem]): Option[(Long, String)] = {
+    val path = new Path(dfsRootDir)
+    if (fm.exists(path)) {
+      fm.list(path, onlyZipFiles)
+        .map(_.getPath.getName.stripSuffix(".zip").split("_"))
+        .filter {
+          case Array(ver, id) => lineage.contains(LineageItem(ver.toLong, id))

Review Comment:
   does this not return a MatchError if there is no `_` to split on? What if 
you see old files using checkpoint v1?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -648,8 +817,9 @@ class RocksDB(
           // inside the uploadSnapshot() called below.
           // If changelog checkpointing is enabled, snapshot will be uploaded 
asynchronously
           // during state store maintenance.
-          snapshot = Some(createSnapshot(checkpointDir, newVersion,
-            colFamilyNameToIdMap.asScala.toMap, 
maxColumnFamilyId.get().toShort))
+          snapshot = Some(createSnapshot(
+            checkpointDir, newVersion, colFamilyNameToIdMap.asScala.toMap,
+            maxColumnFamilyId.get().toShort, sessionStateStoreCkptId))

Review Comment:
   uber nit: one parameter per line please



##########
common/utils/src/main/resources/error/error-conditions.json:
##########
@@ -233,6 +233,11 @@
       "An error occurred during loading state."
     ],
     "subClass" : {
+      "CANNOT_FIND_BASE_SNAPSHOT_CHECKPOINT" : {
+        "message" : [
+          "Cannot find a base snapshot checkpoint. lineage: <lineage>."

Review Comment:
   ```
   Cannot find a base snapshot checkpoint with lineage: <lineage>.
   ```



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -278,27 +280,162 @@ class RocksDB(
   // We send snapshots that needs to be uploaded by the maintenance thread to 
this queue
   private val snapshotsToUploadQueue = new 
ConcurrentLinkedQueue[RocksDBSnapshot]()
 
+  /**
+   * Read the lineage from the changelog files. It first get the changelog 
reader
+   * of the correct changelog version and then read the lineage information 
from the file.
+   * The changelog file is named as version_stateStoreCkptId.changelog
+   * @param version version of the changelog file, used to load changelog file.
+   * @param stateStoreCkptId uniqueId of the changelog file, used to load 
changelog file.
+   * @return the lineage stored in the changelog file
+   */
+  private def getLineageFromChangelogFile(
+      version: Long,
+      stateStoreCkptId: Option[String]): Array[LineageItem] = {
+    var changelogReader: StateStoreChangelogReader = null
+    var currLineage: Array[LineageItem] = Array.empty
+    try {
+      changelogReader = fileManager.getChangelogReader(version, 
stateStoreCkptId)
+      currLineage = changelogReader.lineage
+      logInfo(log"Loading lineage: " +
+        log"${MDC(LogKeys.LINEAGE, lineageManager)} from " +
+        log"changelog version: ${MDC(LogKeys.VERSION_NUM, version)} " +
+        log"uniqueId: ${MDC(LogKeys.UUID, stateStoreCkptId.getOrElse(""))}.")
+    } finally {
+      if (changelogReader != null) {
+        changelogReader.closeIfNeeded()
+      }
+    }
+    currLineage
+  }
+
+
   /**
    * Load the given version of data in a native RocksDB instance.
    * Note that this will copy all the necessary file from DFS to local disk as 
needed,
    * and possibly restart the native RocksDB instance.
    */
-  def load(
+  private def loadWithCheckpointId(
+        version: Long,
+        stateStoreCkptId: Option[String],
+        readOnly: Boolean = false): RocksDB = {
+  // An array contains lineage information from [snapShotVersion, version] 
(inclusive in both ends)
+  var currVersionLineage: Array[LineageItem] = 
lineageManager.getLineageForCurrVersion()
+  try {
+    if (loadedVersion != version || (loadedStateStoreCkptId.isEmpty ||
+        stateStoreCkptId.get != loadedStateStoreCkptId.get)) {
+      closeDB(ignoreException = false)
+
+      val (latestSnapshotVersion, latestSnapshotUniqueId) = {
+        // Special handling when version is 0.
+        // When loading the very first version (0), stateStoreCkptId does not 
need to be defined
+        // because there won't be 0.changelog / 0.zip file created in RocksDB 
under v2.
+        if (version == 0) {
+          assert(stateStoreCkptId.isEmpty, "stateStoreCkptId should be empty 
when version is zero")
+          (0L, None)
+        // When there is a snapshot file, it is the ground truth, we can skip
+        // reconstructing the lineage from changelog file.
+        } else if (fileManager.existsSnapshotFile(version, stateStoreCkptId)) {
+          currVersionLineage = Array(LineageItem(version, 
stateStoreCkptId.get))
+          (version, stateStoreCkptId)
+        } else {
+          currVersionLineage = getLineageFromChangelogFile(version, 
stateStoreCkptId) :+
+            LineageItem(version, stateStoreCkptId.get)
+          currVersionLineage = currVersionLineage.sortBy(_.version)
+
+          val latestSnapshotVersionsAndUniqueId =
+            
fileManager.getLatestSnapshotVersionAndUniqueIdFromLineage(currVersionLineage)
+          latestSnapshotVersionsAndUniqueId match {
+            case Some(pair) => (pair._1, Option(pair._2))
+            case None if currVersionLineage.head.version == 1L =>
+              logDebug(log"Cannot find latest snapshot based on lineage but 
first version " +
+                log"is 1, use 0 as default. Lineage ${MDC(LogKeys.LINEAGE, 
lineageManager)}")
+              (0L, None)
+            case _ =>
+              throw QueryExecutionErrors.cannotFindBaseSnapshotCheckpoint(
+                printLineageItems(currVersionLineage))
+          }
+        }
+      }
+
+      logInfo(log"Loaded latestSnapshotVersion: ${
+        MDC(LogKeys.SNAPSHOT_VERSION, latestSnapshotVersion)}, 
latestSnapshotUniqueId: ${
+        MDC(LogKeys.UUID, latestSnapshotUniqueId)}")
+
+      val metadata = fileManager.loadCheckpointFromDfs(latestSnapshotVersion,
+        workingDir, rocksDBFileMapping, latestSnapshotUniqueId)
+
+      loadedVersion = latestSnapshotVersion
+
+      // reset the last snapshot version to the latest available snapshot 
version
+      lastSnapshotVersion = latestSnapshotVersion
+      lineageManager.resetLineage(currVersionLineage)
+
+      // Initialize maxVersion upon successful load from DFS
+      fileManager.setMaxSeenVersion(version)
+
+      openLocalRocksDB(metadata)
+
+      if (loadedVersion != version) {
+        val versionsAndUniqueIds = currVersionLineage
+          .filter(_.version > loadedVersion)
+          .filter(_.version <= version)

Review Comment:
   nit: to reduce intermediate garbage objects, can you squash these into a 
single filter? You can even call `collect` on it and just make it a single 
operation to even get rid of the `map`
   
   ```scala
   currVersionLineage.collect {
     case i if i.version > loadedVersion && i.version <= version => (i.version, 
Option(i.checkpointUniqueId))
   }
   ```



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -1533,3 +1723,40 @@ case class AcquiredThreadInfo(
   }
 }
 
+/**
+ * A helper class to manage the lineage information when checkpoint unique id 
is enabled.
+ * "lineage" is an array of LineageItem (version, uniqueId) pair.
+ *
+ * The first item of "lineage" should normally be the version of a snapshot, 
except
+ * for the first few versions. Because they are solely loaded from changelog 
file.
+ * (i.e. with default minDeltasForSnapshot, there is only 1_uuid1.changelog, 
no 1_uuid1.zip)
+ *
+ * The last item of "lineage" corresponds to one version before the 
to-be-committed version.
+ */
+private[sql] class RocksDBLineageManager {
+  @volatile private var lineage: Array[LineageItem] = Array.empty
+
+  override def toString: String = lineage.map {
+    case LineageItem(version, uuid) => s"$version: $uuid"
+  }.mkString(" ")
+
+  def appendLineageItem(item: LineageItem): Unit = {
+    lineage = lineage :+ item
+  }
+
+  def resetLineage(newLineage: Array[LineageItem]): Unit = {
+    lineage = newLineage
+  }
+
+  def getLineageForCurrVersion(): Array[LineageItem] = {
+    lineage.clone()
+  }
+
+  def contains(item: LineageItem): Boolean = {
+    lineage.contains(item)
+  }

Review Comment:
   how often will this be called and how long can the list be? Do we want to 
keep around a map or set to make this look up faster?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala:
##########
@@ -483,38 +539,43 @@ class RocksDBFileManager(
     val snapshotFiles = allFiles.filter(file => onlyZipFiles.accept(file))
     val changelogFiles = allFiles.filter(file => 
onlyChangelogFiles.accept(file))
     // All versions present in DFS, sorted
-    val sortedSnapshotVersions = snapshotFiles
-      .map(_.getName.stripSuffix(".zip"))
-      .map(_.toLong)
-      .sorted
+    val sortedSnapshotVersionsAndUniqueIds = snapshotFiles
+      .map(_.getName.stripSuffix(".zip").split("_"))
+      .map {
+        case Array(version, uniqueId) => (version.toLong, Some(uniqueId))
+        case Array(version) => (version.toLong, None)
+      }

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to