anishshri-db commented on code in PR #46944:
URL: https://github.com/apache/spark/pull/46944#discussion_r1640159504


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala:
##########
@@ -116,12 +116,16 @@ case class StateSourceOptions(
     batchId: Long,
     operatorId: Int,
     storeName: String,
-    joinSide: JoinSideValues) {
+    joinSide: JoinSideValues,
+    snapshotStartBatchId: Option[Long],
+    snapshotPartitionId: Option[Int]) {
   def stateCheckpointLocation: Path = new Path(resolvedCpLocation, 
DIR_NAME_STATE)
 
   override def toString: String = {
     s"StateSourceOptions(checkpointLocation=$resolvedCpLocation, 
batchId=$batchId, " +
-      s"operatorId=$operatorId, storeName=$storeName, joinSide=$joinSide)"
+      s"operatorId=$operatorId, storeName=$storeName, joinSide=$joinSide" +
+      snapshotStartBatchId.map(", snapshotStartBatchId=" + _).getOrElse("") +

Review Comment:
   what does this look like when the options are not present ? maybe just do a 
simple `if else` condition otherwise ?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala:
##########
@@ -190,7 +196,28 @@ object StateSourceOptions extends DataSourceOptions {
       throw StateDataSourceErrors.conflictOptions(Seq(JOIN_SIDE, STORE_NAME))
     }
 
-    StateSourceOptions(resolvedCpLocation, batchId, operatorId, storeName, 
joinSide)
+    val snapshotStartBatchId = 
Option(options.get(SNAPSHOT_START_BATCH_ID)).map(_.toLong)
+    if (snapshotStartBatchId.exists(_ < 0)) {
+      throw 
StateDataSourceErrors.invalidOptionValueIsNegative(SNAPSHOT_START_BATCH_ID)
+    } else if (snapshotStartBatchId.exists(_ > batchId)) {
+      throw StateDataSourceErrors.invalidOptionValue(
+        SNAPSHOT_START_BATCH_ID, s"value should be less than or equal to 
$batchId")
+    }
+
+    val snapshotPartitionId = 
Option(options.get(SNAPSHOT_PARTITION_ID)).map(_.toInt)
+    if (snapshotPartitionId.exists(_ < 0)) {
+      throw 
StateDataSourceErrors.invalidOptionValueIsNegative(SNAPSHOT_PARTITION_ID)
+    }
+
+    if (snapshotPartitionId.isDefined && snapshotStartBatchId.isEmpty) {

Review Comment:
   add a comment here for why both are required ?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateScanBuilder.scala:
##########
@@ -81,9 +81,19 @@ class StateScan(
       assert((tail - head + 1) == partitionNums.length,
         s"No continuous partitions in state: 
${partitionNums.mkString("Array(", ", ", ")")}")
 
-      partitionNums.map {
-        pn => new StateStoreInputPartition(pn, queryId, sourceOptions)
-      }.toArray
+      sourceOptions.snapshotPartitionId match {
+        case None => partitionNums.map {
+          pn => new StateStoreInputPartition(pn, queryId, sourceOptions)

Review Comment:
   nit: move `pn =>` to the line above 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to