anishshri-db commented on code in PR #52148:
URL: https://github.com/apache/spark/pull/52148#discussion_r2308808140


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -342,6 +342,51 @@ class RocksDB(
     currLineage
   }
 
+  /**
+   * Construct the full lineage from startVersion to endVersion (inclusive) by
+   * walking backwards using lineage information embedded in changelog files.
+   */
+  def getFullLineage(
+      startVersion: Long,
+      endVersion: Long,
+      endVersionStateStoreCkptId: Option[String]): Array[LineageItem] = {
+    assert(startVersion <= endVersion,
+      s"startVersion $startVersion should be less than or equal to endVersion 
$endVersion")
+
+    // A buffer to collect the lineage information, the entries should be 
decreasing in version
+    val buf = mutable.ArrayBuffer[LineageItem]()
+    buf.append(LineageItem(endVersion, endVersionStateStoreCkptId.get))
+
+    while (buf.last.version > startVersion) {
+      val prevSmallestVersion = buf.last.version
+      val lineage = getLineageFromChangelogFile(buf.last.version, 
Some(buf.last.checkpointUniqueId))
+      // lineage array is sorted in increasing order, we need to reverse it
+      val lineageSorted = lineage.filter(_.version >= 
startVersion).sortBy(_.version).reverse
+      // append to the buffer in reverse order, so the buffer is always 
decreasing in version
+      buf.appendAll(lineageSorted)
+
+      // to prevent infinite loop if we make no progress, throw an exception
+      if (buf.last.version == prevSmallestVersion) {
+        throw new IllegalStateException(s"Lineage is not complete")
+      }
+    }
+
+    // we return the lineage in increasing order
+    val ret = buf.reverse.toArray
+
+    // Sanity checks
+    assert(ret.head.version == startVersion,
+      s"Expected first lineage version to be $startVersion, but got 
${ret.head.version}")
+    assert(ret.last.version == endVersion,
+      s"Expected last lineage version to be $endVersion, but got 
${ret.last.version}")
+    // Assert that the lineage array is strictly increasing in version
+    assert(ret.sliding(2).forall {

Review Comment:
   Maybe move this to an error class as well ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to