dylanwong250 commented on code in PR #52148: URL: https://github.com/apache/spark/pull/52148#discussion_r2310742921
########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala: ########## @@ -342,6 +342,51 @@ class RocksDB( currLineage } + /** + * Construct the full lineage from startVersion to endVersion (inclusive) by + * walking backwards using lineage information embedded in changelog files. + */ + def getFullLineage( + startVersion: Long, + endVersion: Long, + endVersionStateStoreCkptId: Option[String]): Array[LineageItem] = { + assert(startVersion <= endVersion, + s"startVersion $startVersion should be less than or equal to endVersion $endVersion") + + // A buffer to collect the lineage information, the entries should be decreasing in version + val buf = mutable.ArrayBuffer[LineageItem]() + buf.append(LineageItem(endVersion, endVersionStateStoreCkptId.get)) + + while (buf.last.version > startVersion) { + val prevSmallestVersion = buf.last.version + val lineage = getLineageFromChangelogFile(buf.last.version, Some(buf.last.checkpointUniqueId)) + // lineage array is sorted in increasing order, we need to reverse it + val lineageSorted = lineage.filter(_.version >= startVersion).sortBy(_.version).reverse + // append to the buffer in reverse order, so the buffer is always decreasing in version + buf.appendAll(lineageSorted) + + // to prevent infinite loop if we make no progress, throw an exception + if (buf.last.version == prevSmallestVersion) { + throw new IllegalStateException(s"Lineage is not complete") Review Comment: Done. Created `INVALID_CHECKPOINT_LINEAGE`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org