liviazhu commented on code in PR #52148:
URL: https://github.com/apache/spark/pull/52148#discussion_r2311440471


##########
common/utils/src/main/resources/error/error-conditions.json:
##########
@@ -5433,6 +5438,12 @@
     ],
     "sqlState" : "KD006"
   },
+  "STDS_MIXED_CHECKPOINT_FORMAT_VERSIONS_NOT_SUPPORTED" : {
+    "message" : [
+      "Reading state across different checkpoint format versions is not 
supported. startBatchId=<startBatchId>, endBatchId=<endBatchId>."

Review Comment:
   Can we also add the different checkpoint format versions they used here? I 
know there are only 2 now but we will add more in the future.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala:
##########
@@ -1064,8 +1064,15 @@ private[sql] class HDFSBackedStateStoreProvider extends 
StateStoreProvider with
   override def getStateStoreChangeDataReader(
       startVersion: Long,
       endVersion: Long,
-      colFamilyNameOpt: Option[String] = None):
+      colFamilyNameOpt: Option[String] = None,
+      endVersionStateStoreCkptId: Option[String] = None):
     StateStoreChangeDataReader = {
+
+    if (endVersionStateStoreCkptId.isDefined) {
+      throw QueryExecutionErrors.cannotLoadStore(new SparkException(

Review Comment:
   Can we make a new error condition for this (and change the other place where 
we do this)?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -342,6 +342,59 @@ class RocksDB(
     currLineage
   }
 
+  /**
+   * Construct the full lineage from startVersion to endVersion (inclusive) by
+   * walking backwards using lineage information embedded in changelog files.
+   */
+  def getFullLineage(

Review Comment:
   Can we add some unit tests for this new function? The logic seems quite 
complicated, want to make sure we can test all edge cases. Particularly the 
error cases.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala:
##########
@@ -1224,17 +1227,32 @@ object RocksDBStateStoreProvider {
 /** [[StateStoreChangeDataReader]] implementation for 
[[RocksDBStateStoreProvider]] */
 class RocksDBStateStoreChangeDataReader(
     fm: CheckpointFileManager,
+    rocksDB: RocksDB,

Review Comment:
   Hm, seems a little strange to me that we are passing in RocksDB here in its 
entirety just so we can use getFullLineage. Is there a way to abstract out the 
getFullLineage functionality so we can reuse it a different way?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to