dylanwong250 commented on code in PR #52148:
URL: https://github.com/apache/spark/pull/52148#discussion_r2310739951


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -342,6 +342,51 @@ class RocksDB(
     currLineage
   }
 
+  /**
+   * Construct the full lineage from startVersion to endVersion (inclusive) by
+   * walking backwards using lineage information embedded in changelog files.
+   */
+  def getFullLineage(
+      startVersion: Long,
+      endVersion: Long,
+      endVersionStateStoreCkptId: Option[String]): Array[LineageItem] = {
+    assert(startVersion <= endVersion,
+      s"startVersion $startVersion should be less than or equal to endVersion 
$endVersion")
+
+    // A buffer to collect the lineage information, the entries should be 
decreasing in version
+    val buf = mutable.ArrayBuffer[LineageItem]()
+    buf.append(LineageItem(endVersion, endVersionStateStoreCkptId.get))
+
+    while (buf.last.version > startVersion) {
+      val prevSmallestVersion = buf.last.version
+      val lineage = getLineageFromChangelogFile(buf.last.version, 
Some(buf.last.checkpointUniqueId))
+      // lineage array is sorted in increasing order, we need to reverse it
+      val lineageSorted = lineage.filter(_.version >= 
startVersion).sortBy(_.version).reverse

Review Comment:
   Changed to`.sortBy(-_.version)`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to