dylanwong250 commented on code in PR #52148: URL: https://github.com/apache/spark/pull/52148#discussion_r2310739951
########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala: ########## @@ -342,6 +342,51 @@ class RocksDB( currLineage } + /** + * Construct the full lineage from startVersion to endVersion (inclusive) by + * walking backwards using lineage information embedded in changelog files. + */ + def getFullLineage( + startVersion: Long, + endVersion: Long, + endVersionStateStoreCkptId: Option[String]): Array[LineageItem] = { + assert(startVersion <= endVersion, + s"startVersion $startVersion should be less than or equal to endVersion $endVersion") + + // A buffer to collect the lineage information, the entries should be decreasing in version + val buf = mutable.ArrayBuffer[LineageItem]() + buf.append(LineageItem(endVersion, endVersionStateStoreCkptId.get)) + + while (buf.last.version > startVersion) { + val prevSmallestVersion = buf.last.version + val lineage = getLineageFromChangelogFile(buf.last.version, Some(buf.last.checkpointUniqueId)) + // lineage array is sorted in increasing order, we need to reverse it + val lineageSorted = lineage.filter(_.version >= startVersion).sortBy(_.version).reverse Review Comment: Changed to`.sortBy(-_.version)` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org