WweiL commented on code in PR #48355:
URL: https://github.com/apache/spark/pull/48355#discussion_r1880848758


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala:
##########
@@ -331,20 +352,52 @@ class RocksDBFileManager(
     }
   }
 
+  /**
+   * Based on the ground truth lineage loaded from changelog file (lineage), 
this function
+   * does file listing to find all snapshot (version, uniqueId) pairs, and 
finds
+   * the ground truth latest snapshot (version, uniqueId) the db instance 
needs to load.
+   *
+   * @param lineage The ground truth lineage loaded from changelog file, 
sorted by id
+   * @return The ground truth latest snapshot (version, uniqueId) the db 
instance needs to load,
+   *         when the return value is None it means ther is no such snapshot 
found.
+   */
+  def getLatestSnapshotVersionAndUniqueIdFromLineage(
+      lineage: Array[LineageItem]): Option[(Long, String)] = {
+    val path = new Path(dfsRootDir)
+    if (fm.exists(path)) {
+      fm.list(path, onlyZipFiles)
+        .map(_.getPath.getName.stripSuffix(".zip").split("_"))
+        .filter {
+          case Array(ver, id) => lineage.contains(LineageItem(ver.toLong, id))

Review Comment:
   Yes then there would be errors... originaly it was implemented in a way to 
handle both v1 and v2 like 
   ```
   .filter {
             case Array(version, _) => xxx
             case Array(version) => xxx
           }
   ```
   cc @siying says we should separate the logic



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to