anishshri-db commented on code in PR #46944:
URL: https://github.com/apache/spark/pull/46944#discussion_r1638644771


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala:
##########
@@ -116,12 +116,15 @@ case class StateSourceOptions(
     batchId: Long,
     operatorId: Int,
     storeName: String,
-    joinSide: JoinSideValues) {
+    joinSide: JoinSideValues,
+    snapshotStartBatchId: Option[Long],
+    snapshotPartitionId: Option[Int]) {
   def stateCheckpointLocation: Path = new Path(resolvedCpLocation, 
DIR_NAME_STATE)
 
   override def toString: String = {
     s"StateSourceOptions(checkpointLocation=$resolvedCpLocation, 
batchId=$batchId, " +
-      s"operatorId=$operatorId, storeName=$storeName, joinSide=$joinSide)"
+      s"operatorId=$operatorId, storeName=$storeName, joinSide=$joinSide, " +
+      s"snapshotStartBatchId=$snapshotStartBatchId, 
snapshotPartitionId=$snapshotPartitionId)"

Review Comment:
   maybe log these only if specified ?



##########
common/utils/src/main/resources/error/error-conditions.json:
##########
@@ -239,6 +244,11 @@
           "Error reading streaming state file of <fileToRead> does not exist. 
If the stream job is restarted with a new or updated state operation, please 
create a new checkpoint location or clear the existing checkpoint location."
         ]
       },
+      "SNAPSHOT_PARTITION_ID_NOT_FOUND" : {
+        "message" : [
+          "Partition id <snapshotPartitionId> not found for given state 
source."

Review Comment:
   "given state source" might not give enough info. Can we also add some 
additional metadata such as checkpoint location, operator id etc ?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala:
##########
@@ -190,7 +195,28 @@ object StateSourceOptions extends DataSourceOptions {
       throw StateDataSourceErrors.conflictOptions(Seq(JOIN_SIDE, STORE_NAME))
     }
 
-    StateSourceOptions(resolvedCpLocation, batchId, operatorId, storeName, 
joinSide)
+    val snapshotStartBatchId = 
Option(options.get(SNAPSHOT_START_BATCH_ID)).map(_.toLong)
+    if (snapshotStartBatchId.exists(_ < 0)) {
+      throw 
StateDataSourceErrors.invalidOptionValueIsNegative(SNAPSHOT_START_BATCH_ID)
+    } else if (snapshotStartBatchId.exists(_ > batchId)) {
+      throw StateDataSourceErrors.invalidOptionValue(
+        SNAPSHOT_START_BATCH_ID, s"value should be less than or equal to 
$batchId")
+    }
+
+    val snapshotPartitionId = 
Option(options.get(SNAPSHOT_PARTITION_ID)).map(_.toInt)
+    if (snapshotPartitionId.exists(_ < 0)) {
+      throw 
StateDataSourceErrors.invalidOptionValueIsNegative(SNAPSHOT_PARTITION_ID)

Review Comment:
   Can we also check for partition id being in the valid range ? or we don't 
have the info about total num of partitions here ?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala:
##########
@@ -261,6 +261,21 @@ private[sql] class HDFSBackedStateStoreProvider extends 
StateStoreProvider with
     new HDFSBackedStateStore(version, newMap)
   }
 
+  /**
+   * Get the state store of endVersion for reading by applying delta files on 
the snapshot of
+   * startVersion. If startVersion does not exist, an error will be thrown.
+   *
+   * @param startVersion checkpoint version of the snapshot to start with
+   * @param endVersion   checkpoint version to end with

Review Comment:
   Also specify return value passed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to