siying commented on code in PR #48355:
URL: https://github.com/apache/spark/pull/48355#discussion_r1872195109


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -359,6 +471,50 @@ class RocksDB(
     this
   }
 
+  /**
+   * Initialize in memory values based on the metadata loaded from DFS.

Review Comment:
   This is too vague. We should be more concrete. We should be explicit what it 
does. It is to open local RocksDB and initialize number of key metrics?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala:
##########
@@ -331,20 +346,51 @@ class RocksDBFileManager(
     }
   }
 
+  /**
+   * Based on the ground truth lineage loaded from changelog file (lineage), 
this function
+   * does file listing to find all snapshot (version, uniqueId) pairs, and 
finds
+   * the ground truth latest snapshot (version, uniqueId) the db instance 
needs to load.
+   *
+   * @param lineage the ground truth lineage loaded from changelog file
+   * @return the ground truth latest snapshot (version, uniqueId) the db 
instance needs to load

Review Comment:
   Mention that it is sorted by what order and what does mean if the return 
value is None.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -278,27 +280,159 @@ class RocksDB(
   // We send snapshots that needs to be uploaded by the maintenance thread to 
this queue
   private val snapshotsToUploadQueue = new 
ConcurrentLinkedQueue[RocksDBSnapshot]()
 
+  /**
+   * Read the lineage from the changelog files. It first get the changelog 
reader
+   * of the correct changelog version and then read the lineage information 
from the file.
+   * The changelog file is named as version_stateStoreCkptId.changelog
+   * @param version version of the changelog file, used to load changelog file.
+   * @param stateStoreCkptId uniqueId of the changelog file, used to load 
changelog file.
+   * @return
+   */
+  private def getLineageFromChangelogFile(
+      version: Long,
+      stateStoreCkptId: Option[String]): Array[LineageItem] = {
+    var changelogReader: StateStoreChangelogReader = null
+    var currLineage: Array[LineageItem] = Array.empty
+    try {
+      changelogReader = fileManager.getChangelogReader(version, 
stateStoreCkptId)
+      currLineage = changelogReader.lineage
+      logInfo(log"Loading lineage: " +
+        log"${MDC(LogKeys.LINEAGE, lineageManager)} from " +
+        log"changelog version: ${MDC(LogKeys.VERSION_NUM, version)} " +
+        log"uniqueId: ${MDC(LogKeys.UUID, stateStoreCkptId.getOrElse(""))}.")
+    } finally {
+      if (changelogReader != null) {
+        changelogReader.closeIfNeeded()
+      }
+    }
+    currLineage
+  }
+
+
   /**
    * Load the given version of data in a native RocksDB instance.
    * Note that this will copy all the necessary file from DFS to local disk as 
needed,
    * and possibly restart the native RocksDB instance.
    */
-  def load(
+  private def loadV2(
+        version: Long,
+        stateStoreCkptId: Option[String],
+        readOnly: Boolean = false): RocksDB = {
+  // An array contains lineage information from [snapShotVersion, version] 
(inclusive both end)
+  var currVersionLineage: Array[LineageItem] = lineageManager.getLineage()

Review Comment:
   Can it be moved inside the if below?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala:
##########
@@ -965,7 +966,8 @@ class MicroBatchExecution(
       updateStateStoreCkptId(execCtx, latestExecPlan)
     }
     execCtx.reportTimeTaken("commitOffsets") {
-      if (!commitLog.add(execCtx.batchId, 
CommitMetadata(watermarkTracker.currentWatermark))) {
+      if (!commitLog.add(execCtx.batchId,
+        CommitMetadata(watermarkTracker.currentWatermark, 
currentStateStoreCkptId.toMap))) {

Review Comment:
   @WweiL there must be a misunderstanding here. My previous comment
   > Same as commented above. It is better to fill a None if it is it is V1.
   > Did you validate that for V1, the commit log generated can be read by an 
older version?
   
   is two different comments. Even if validation for V1 is done, I still hope 
None is used for V1, and I've explained it above. See my explanation above 
https://github.com/apache/spark/pull/48355/files#r1870082653
   I didn't see it addressed yet.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -1036,6 +1208,40 @@ class RocksDB(
     Option(acquiredThreadInfo).map(_.copy())
   }
 
+  /** Upload the snapshot to DFS and remove it from snapshots pending */
+  private def uploadSnapshot(
+    snapshot: RocksDB#RocksDBSnapshot,
+    fileManager: RocksDBFileManager,
+    snapshotsPendingUpload: Set[RocksDBVersionSnapshotInfo],
+    loggingId: String): RocksDBFileManagerMetrics = {
+    var fileManagerMetrics: RocksDBFileManagerMetrics = null
+    try {
+      val uploadTime = timeTakenMs {
+        fileManager.saveCheckpointToDfs(snapshot.checkpointDir,
+          snapshot.version, snapshot.numKeys, snapshot.fileMapping,
+          Some(snapshot.columnFamilyMapping), 
Some(snapshot.maxColumnFamilyId), snapshot.uniqueId)
+        fileManagerMetrics = fileManager.latestSaveCheckpointMetrics
+
+        val snapshotInfo = RocksDBVersionSnapshotInfo(snapshot.version, 
snapshot.dfsFileSuffix)
+        // We are only removing the uploaded snapshot info from the pending 
set,
+        // to let the file mapping (i.e. query threads) know that the snapshot 
(i.e. and its files)
+        // have been uploaded to DFS. We don't touch the file mapping here to 
avoid corrupting it.
+        snapshotsPendingUpload.remove(snapshotInfo)
+      }
+      lineageManager.resetLineage(lineageManager.getLineage()
+        .filter(i => i.version >= snapshot.version))

Review Comment:
   This is a relatively aggressive cleanup strategy. It is because that even if 
the uploading succeeds, it is not necessarily the one written to the commit 
log. Lineage is relatively cheap, I would keep it for longer if possible, 
perhaps even keeping it until versions are cleaned up by the cleanup 
maintenance task. But here is not wrong, as we can always re-read from 
changelog files. Maybe add a comment for the rational here.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala:
##########
@@ -331,20 +346,51 @@ class RocksDBFileManager(
     }
   }
 
+  /**
+   * Based on the ground truth lineage loaded from changelog file (lineage), 
this function
+   * does file listing to find all snapshot (version, uniqueId) pairs, and 
finds
+   * the ground truth latest snapshot (version, uniqueId) the db instance 
needs to load.
+   *
+   * @param lineage the ground truth lineage loaded from changelog file
+   * @return the ground truth latest snapshot (version, uniqueId) the db 
instance needs to load
+   */
+  def getLatestSnapshotVersionAndUniqueIdFromLineage(
+      lineage: Array[LineageItem]): Option[(Long, Option[String])] = {
+    val path = new Path(dfsRootDir)
+    if (fm.exists(path)) {
+      fm.list(path, onlyZipFiles)
+        .map(_.getPath.getName.stripSuffix(".zip").split("_"))
+        .filter {
+          case Array(ver, id) => lineage.contains(LineageItem(ver.toLong, id))
+        }
+        .map {
+          case Array(version, uniqueId) => (version.toLong, Option(uniqueId))
+        }
+        .sortBy(_._1)
+        .reverse
+        .headOption
+    } else {
+      Some(0, None)

Review Comment:
   What does (0, None) mean? If it means that the linage starts from 1 and we 
don't have any snapshot uploaded yet, we should use None to indicate that, and 
the return value should be `Option[(Long, String)]. (0, None) is not consistent 
with how we deal with checkpointID. A checkpointID None usually means it is V1 
so checkpointID is not available.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -278,27 +280,159 @@ class RocksDB(
   // We send snapshots that needs to be uploaded by the maintenance thread to 
this queue
   private val snapshotsToUploadQueue = new 
ConcurrentLinkedQueue[RocksDBSnapshot]()
 
+  /**
+   * Read the lineage from the changelog files. It first get the changelog 
reader
+   * of the correct changelog version and then read the lineage information 
from the file.
+   * The changelog file is named as version_stateStoreCkptId.changelog
+   * @param version version of the changelog file, used to load changelog file.
+   * @param stateStoreCkptId uniqueId of the changelog file, used to load 
changelog file.
+   * @return
+   */
+  private def getLineageFromChangelogFile(
+      version: Long,
+      stateStoreCkptId: Option[String]): Array[LineageItem] = {
+    var changelogReader: StateStoreChangelogReader = null
+    var currLineage: Array[LineageItem] = Array.empty
+    try {
+      changelogReader = fileManager.getChangelogReader(version, 
stateStoreCkptId)
+      currLineage = changelogReader.lineage
+      logInfo(log"Loading lineage: " +
+        log"${MDC(LogKeys.LINEAGE, lineageManager)} from " +
+        log"changelog version: ${MDC(LogKeys.VERSION_NUM, version)} " +
+        log"uniqueId: ${MDC(LogKeys.UUID, stateStoreCkptId.getOrElse(""))}.")
+    } finally {
+      if (changelogReader != null) {
+        changelogReader.closeIfNeeded()
+      }
+    }
+    currLineage
+  }
+
+
   /**
    * Load the given version of data in a native RocksDB instance.
    * Note that this will copy all the necessary file from DFS to local disk as 
needed,
    * and possibly restart the native RocksDB instance.
    */
-  def load(
+  private def loadV2(
+        version: Long,
+        stateStoreCkptId: Option[String],
+        readOnly: Boolean = false): RocksDB = {
+  // An array contains lineage information from [snapShotVersion, version] 
(inclusive both end)
+  var currVersionLineage: Array[LineageItem] = lineageManager.getLineage()
+  try {
+    if (loadedVersion != version || (loadedStateStoreCkptId.isEmpty ||
+        stateStoreCkptId.get != loadedStateStoreCkptId.get)) {
+      closeDB(ignoreException = false)
+
+      val (latestSnapshotVersion, latestSnapshotUniqueId) = {
+        // Special handling when version is 0.
+        // When loading the very first version (0), stateStoreCkptId does not 
need to be defined
+        // because there won't be 0.changelog / 0.zip file created in RocksDB 
under v2.
+        if (version == 0) {
+          assert(stateStoreCkptId.isEmpty, "stateStoreCkptId should be empty 
when version is zero")
+          (0L, None)
+        // When there is a snapshot file, it is the ground truth, we can skip
+        // reconstructing the lineage from changelog file.
+        } else if (fileManager.existsSnapshotFile(version, stateStoreCkptId)) {
+          currVersionLineage = Array(LineageItem(version, 
stateStoreCkptId.get))
+          lineageManager.resetLineage(currVersionLineage)
+          (version, stateStoreCkptId)
+        } else {
+          currVersionLineage = getLineageFromChangelogFile(version, 
stateStoreCkptId) :+
+            LineageItem(version, stateStoreCkptId.get)
+          currVersionLineage = currVersionLineage.sortBy(_.version)
+          lineageManager.resetLineage(currVersionLineage)
+
+          val latestSnapshotVersionsAndUniqueId =
+            
fileManager.getLatestSnapshotVersionAndUniqueIdFromLineage(currVersionLineage)
+          latestSnapshotVersionsAndUniqueId match {
+            case Some(pair) => pair
+            case None =>
+              logWarning("Cannot find latest snapshot based on lineage: "
+                + printLineageItems(currVersionLineage))
+              (0L, None)

Review Comment:
   The handling looks OK but can you explain why `cannot_load_base_snapshot` is 
removed? We still need to throw it if none of 1-4 is applicable?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -359,6 +471,50 @@ class RocksDB(
     this
   }
 
+  /**
+   * Initialize in memory values based on the metadata loaded from DFS.
+   * @param metadata: metadata loaded from DFS
+   */
+  private def init(metadata: RocksDBCheckpointMetadata): Unit = {
+
+    setInitialCFInfo()
+    metadata.columnFamilyMapping.foreach { mapping =>
+      colFamilyNameToIdMap.putAll(mapping.asJava)
+    }
+
+    metadata.maxColumnFamilyId.foreach { maxId =>
+      maxColumnFamilyId.set(maxId)
+    }
+    openDB()
+    numKeysOnWritingVersion = if (!conf.trackTotalNumberOfRows) {
+      // we don't track the total number of rows - discard the number being 
track
+      -1L
+    } else if (metadata.numKeys < 0) {
+      // we track the total number of rows, but the snapshot doesn't have 
tracking number
+      // need to count keys now
+      countKeys()
+    } else {
+      metadata.numKeys
+    }
+  }
+
+  def load(
+      version: Long,
+      stateStoreCkptId: Option[String] = None,
+      readOnly: Boolean = false): RocksDB = {
+    assert(version >= 0)
+    acquire(LoadStore)
+    recordedMetrics = None
+    logInfo(log"Loading ${MDC(LogKeys.VERSION_NUM, version)} with 
stateStoreCkptId: ${
+      MDC(LogKeys.UUID, stateStoreCkptId.getOrElse(""))}")
+    if (enableStateStoreCheckpointIds) {

Review Comment:
   Should be when stateStoreCkptId is non-empty, rather than 
`enableStateStoreCheckpointIds`. No matter whether we support V1->V2 
compatibility or not, the source of truth of what we can load is from 
`stateStoreCkptId` loaded from the commit log, not the configuration, which is 
more about what version to be rewritten, nor read. Even if we don't support 
V1->V2, we still should use `stateStoreCkptId` but add check and explicitly 
fail the query, saying it is not supported. This is perhaps already done in 
driver?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -359,6 +471,50 @@ class RocksDB(
     this
   }
 
+  /**
+   * Initialize in memory values based on the metadata loaded from DFS.
+   * @param metadata: metadata loaded from DFS

Review Comment:
   Personally, I don't like comment that provides little information. Either 
makes it more useful or delete it.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -278,27 +280,159 @@ class RocksDB(
   // We send snapshots that needs to be uploaded by the maintenance thread to 
this queue
   private val snapshotsToUploadQueue = new 
ConcurrentLinkedQueue[RocksDBSnapshot]()
 
+  /**
+   * Read the lineage from the changelog files. It first get the changelog 
reader
+   * of the correct changelog version and then read the lineage information 
from the file.
+   * The changelog file is named as version_stateStoreCkptId.changelog
+   * @param version version of the changelog file, used to load changelog file.
+   * @param stateStoreCkptId uniqueId of the changelog file, used to load 
changelog file.
+   * @return
+   */
+  private def getLineageFromChangelogFile(
+      version: Long,
+      stateStoreCkptId: Option[String]): Array[LineageItem] = {
+    var changelogReader: StateStoreChangelogReader = null
+    var currLineage: Array[LineageItem] = Array.empty
+    try {
+      changelogReader = fileManager.getChangelogReader(version, 
stateStoreCkptId)
+      currLineage = changelogReader.lineage
+      logInfo(log"Loading lineage: " +
+        log"${MDC(LogKeys.LINEAGE, lineageManager)} from " +
+        log"changelog version: ${MDC(LogKeys.VERSION_NUM, version)} " +
+        log"uniqueId: ${MDC(LogKeys.UUID, stateStoreCkptId.getOrElse(""))}.")
+    } finally {
+      if (changelogReader != null) {
+        changelogReader.closeIfNeeded()
+      }
+    }
+    currLineage
+  }
+
+
   /**
    * Load the given version of data in a native RocksDB instance.
    * Note that this will copy all the necessary file from DFS to local disk as 
needed,
    * and possibly restart the native RocksDB instance.
    */
-  def load(
+  private def loadV2(
+        version: Long,
+        stateStoreCkptId: Option[String],
+        readOnly: Boolean = false): RocksDB = {
+  // An array contains lineage information from [snapShotVersion, version] 
(inclusive both end)
+  var currVersionLineage: Array[LineageItem] = lineageManager.getLineage()
+  try {
+    if (loadedVersion != version || (loadedStateStoreCkptId.isEmpty ||
+        stateStoreCkptId.get != loadedStateStoreCkptId.get)) {
+      closeDB(ignoreException = false)
+
+      val (latestSnapshotVersion, latestSnapshotUniqueId) = {
+        // Special handling when version is 0.
+        // When loading the very first version (0), stateStoreCkptId does not 
need to be defined
+        // because there won't be 0.changelog / 0.zip file created in RocksDB 
under v2.
+        if (version == 0) {
+          assert(stateStoreCkptId.isEmpty, "stateStoreCkptId should be empty 
when version is zero")
+          (0L, None)
+        // When there is a snapshot file, it is the ground truth, we can skip
+        // reconstructing the lineage from changelog file.
+        } else if (fileManager.existsSnapshotFile(version, stateStoreCkptId)) {
+          currVersionLineage = Array(LineageItem(version, 
stateStoreCkptId.get))
+          lineageManager.resetLineage(currVersionLineage)
+          (version, stateStoreCkptId)
+        } else {
+          currVersionLineage = getLineageFromChangelogFile(version, 
stateStoreCkptId) :+
+            LineageItem(version, stateStoreCkptId.get)
+          currVersionLineage = currVersionLineage.sortBy(_.version)
+          lineageManager.resetLineage(currVersionLineage)
+
+          val latestSnapshotVersionsAndUniqueId =
+            
fileManager.getLatestSnapshotVersionAndUniqueIdFromLineage(currVersionLineage)
+          latestSnapshotVersionsAndUniqueId match {
+            case Some(pair) => pair
+            case None =>
+              logWarning("Cannot find latest snapshot based on lineage: "
+                + printLineageItems(currVersionLineage))
+              (0L, None)

Review Comment:
   version 0 should only be used if changelog lineage covers version 1. If it 
doesn't, we should fail the load.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -1036,6 +1184,39 @@ class RocksDB(
     Option(acquiredThreadInfo).map(_.copy())
   }
 
+  /** Upload the snapshot to DFS and remove it from snapshots pending */
+  private def uploadSnapshot(
+    snapshot: RocksDB#RocksDBSnapshot,
+    fileManager: RocksDBFileManager,
+    snapshotsPendingUpload: Set[RocksDBVersionSnapshotInfo],
+    loggingId: String): RocksDBFileManagerMetrics = {

Review Comment:
   @WweiL Oh I got it. It now needs to access `lineageManager`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to