liviazhu commented on code in PR #52148: URL: https://github.com/apache/spark/pull/52148#discussion_r2311440471
########## common/utils/src/main/resources/error/error-conditions.json: ########## @@ -5433,6 +5438,12 @@ ], "sqlState" : "KD006" }, + "STDS_MIXED_CHECKPOINT_FORMAT_VERSIONS_NOT_SUPPORTED" : { + "message" : [ + "Reading state across different checkpoint format versions is not supported. startBatchId=<startBatchId>, endBatchId=<endBatchId>." Review Comment: Can we also add the different checkpoint format versions they used here? I know there are only 2 now but we will add more in the future. ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala: ########## @@ -1064,8 +1064,15 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with override def getStateStoreChangeDataReader( startVersion: Long, endVersion: Long, - colFamilyNameOpt: Option[String] = None): + colFamilyNameOpt: Option[String] = None, + endVersionStateStoreCkptId: Option[String] = None): StateStoreChangeDataReader = { + + if (endVersionStateStoreCkptId.isDefined) { + throw QueryExecutionErrors.cannotLoadStore(new SparkException( Review Comment: Can we make a new error condition for this (and change the other place where we do this)? ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala: ########## @@ -342,6 +342,59 @@ class RocksDB( currLineage } + /** + * Construct the full lineage from startVersion to endVersion (inclusive) by + * walking backwards using lineage information embedded in changelog files. + */ + def getFullLineage( Review Comment: Can we add some unit tests for this new function? The logic seems quite complicated, want to make sure we can test all edge cases. Particularly the error cases. ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala: ########## @@ -1224,17 +1227,32 @@ object RocksDBStateStoreProvider { /** [[StateStoreChangeDataReader]] implementation for [[RocksDBStateStoreProvider]] */ class RocksDBStateStoreChangeDataReader( fm: CheckpointFileManager, + rocksDB: RocksDB, Review Comment: Hm, seems a little strange to me that we are passing in RocksDB here in its entirety just so we can use getFullLineage. Is there a way to abstract out the getFullLineage functionality so we can reuse it a different way? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org