micheal-o commented on code in PR #52202:
URL: https://github.com/apache/spark/pull/52202#discussion_r2326477513


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/SymmetricHashJoinStateManager.scala:
##########
@@ -509,13 +515,19 @@ abstract class SymmetricHashJoinStateManager(
           storeProviderId, keySchema, valueSchema, 
NoPrefixKeyStateEncoderSpec(keySchema),
           useColumnFamilies = useVirtualColumnFamilies, storeConf, hadoopConf,
           useMultipleValuesPerKey = false, stateSchemaProvider = None)
-        if (snapshotStartVersion.isDefined) {
+        if (handlerSnapshotOptions.isDefined) {
           if (!stateStoreProvider.isInstanceOf[SupportsFineGrainedReplay]) {
             throw 
StateStoreErrors.stateStoreProviderDoesNotSupportFineGrainedReplay(
               stateStoreProvider.getClass.toString)
           }
+          val opts = handlerSnapshotOptions.get
           stateStoreProvider.asInstanceOf[SupportsFineGrainedReplay]
-            .replayStateFromSnapshot(snapshotStartVersion.get, 
stateInfo.get.storeVersion)
+            .replayStateFromSnapshot(
+              opts.snapshotVersion,
+              opts.endVersion,
+              readOnly = false,

Review Comment:
   why is `readOnly` false for state reader?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/SymmetricHashJoinStateManager.scala:
##########
@@ -1280,3 +1298,35 @@ object SymmetricHashJoinStateManager {
     }
   }
 }
+
+/**
+ * Options controlling snapshot-based state replay.

Review Comment:
   nit: include "for state data source reader"



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -681,9 +681,14 @@ class RocksDB(
    *         Note that the instance will be read-only since this method is 
only used in State Data
    *         Source.
    */
-  def loadFromSnapshot(snapshotVersion: Long, endVersion: Long): RocksDB = {
+  def loadFromSnapshot(
+      snapshotVersion: Long,
+      endVersion: Long,
+      snapshotVersionStateStoreCkptId: Option[String] = None,
+      endVersionStateStoreCkptId: Option[String] = None): RocksDB = {

Review Comment:
   nit: update func param comment



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/SymmetricHashJoinStateManager.scala:
##########
@@ -1280,3 +1298,35 @@ object SymmetricHashJoinStateManager {
     }
   }
 }
+
+/**
+ * Options controlling snapshot-based state replay.
+ */
+case class SnapshotOptions(
+    snapshotVersion: Long,
+    endVersion: Long,
+    startKeyToNumValuesStateStoreCkptId: Option[String] = None,
+    startKeyWithIndexToValueStateStoreCkptId: Option[String] = None,
+    endKeyToNumValuesStateStoreCkptId: Option[String] = None,
+    endKeyWithIndexToValueStateStoreCkptId: Option[String] = None) {
+      def getKeyToNumValuesHandlerOpts(): HandlerSnapshotOptions =
+        HandlerSnapshotOptions(
+          snapshotVersion = snapshotVersion,
+          endVersion = endVersion,
+          startStateStoreCkptId = startKeyToNumValuesStateStoreCkptId,
+          endStateStoreCkptId = endKeyToNumValuesStateStoreCkptId)
+
+      def getKeyWithIndexToValueHandlerOpts(): HandlerSnapshotOptions =
+        HandlerSnapshotOptions(
+          snapshotVersion = snapshotVersion,
+          endVersion = endVersion,
+          startStateStoreCkptId = startKeyWithIndexToValueStateStoreCkptId,
+          endStateStoreCkptId = endKeyWithIndexToValueStateStoreCkptId)
+    }
+
+/** Snapshot options specialized for a single state store handler. */
+case class HandlerSnapshotOptions(

Review Comment:
   nit: private?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala:
##########
@@ -801,7 +801,11 @@ trait SupportsFineGrainedReplay {
    * @param endVersion   checkpoint version to end with
    */
   def replayStateFromSnapshot(
-      snapshotVersion: Long, endVersion: Long, readOnly: Boolean = false): 
StateStore
+      snapshotVersion: Long,
+      endVersion: Long,
+      readOnly: Boolean = false,
+      snapshotVersionStateStoreCkptId: Option[String] = None,

Review Comment:
   ditto



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -721,10 +730,16 @@ class RocksDB(
    * @param snapshotVersion start checkpoint version
    * @param endVersion end version
    */
-  private def replayFromCheckpoint(snapshotVersion: Long, endVersion: Long): 
Any = {
+  private def replayFromCheckpoint(
+      snapshotVersion: Long,
+      endVersion: Long,
+      snapshotVersionStateStoreCkptId: Option[String] = None,
+      endVersionStateStoreCkptId: Option[String] = None): Any = {

Review Comment:
   ditto



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala:
##########
@@ -813,8 +817,18 @@ trait SupportsFineGrainedReplay {
    * @param snapshotVersion checkpoint version of the snapshot to start with
    * @param endVersion   checkpoint version to end with
    */
-  def replayReadStateFromSnapshot(snapshotVersion: Long, endVersion: Long): 
ReadStateStore = {
-    new WrappedReadStateStore(replayStateFromSnapshot(snapshotVersion, 
endVersion, readOnly = true))
+  def replayReadStateFromSnapshot(
+      snapshotVersion: Long,
+      endVersion: Long,
+      snapshotVersionStateStoreCkptId: Option[String] = None,
+      endVersionStateStoreCkptId: Option[String] = None): ReadStateStore = {

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to