micheal-o commented on code in PR #52202: URL: https://github.com/apache/spark/pull/52202#discussion_r2326477513
########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/SymmetricHashJoinStateManager.scala: ########## @@ -509,13 +515,19 @@ abstract class SymmetricHashJoinStateManager( storeProviderId, keySchema, valueSchema, NoPrefixKeyStateEncoderSpec(keySchema), useColumnFamilies = useVirtualColumnFamilies, storeConf, hadoopConf, useMultipleValuesPerKey = false, stateSchemaProvider = None) - if (snapshotStartVersion.isDefined) { + if (handlerSnapshotOptions.isDefined) { if (!stateStoreProvider.isInstanceOf[SupportsFineGrainedReplay]) { throw StateStoreErrors.stateStoreProviderDoesNotSupportFineGrainedReplay( stateStoreProvider.getClass.toString) } + val opts = handlerSnapshotOptions.get stateStoreProvider.asInstanceOf[SupportsFineGrainedReplay] - .replayStateFromSnapshot(snapshotStartVersion.get, stateInfo.get.storeVersion) + .replayStateFromSnapshot( + opts.snapshotVersion, + opts.endVersion, + readOnly = false, Review Comment: why is `readOnly` false for state reader? ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/SymmetricHashJoinStateManager.scala: ########## @@ -1280,3 +1298,35 @@ object SymmetricHashJoinStateManager { } } } + +/** + * Options controlling snapshot-based state replay. Review Comment: nit: include "for state data source reader" ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala: ########## @@ -681,9 +681,14 @@ class RocksDB( * Note that the instance will be read-only since this method is only used in State Data * Source. */ - def loadFromSnapshot(snapshotVersion: Long, endVersion: Long): RocksDB = { + def loadFromSnapshot( + snapshotVersion: Long, + endVersion: Long, + snapshotVersionStateStoreCkptId: Option[String] = None, + endVersionStateStoreCkptId: Option[String] = None): RocksDB = { Review Comment: nit: update func param comment ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/SymmetricHashJoinStateManager.scala: ########## @@ -1280,3 +1298,35 @@ object SymmetricHashJoinStateManager { } } } + +/** + * Options controlling snapshot-based state replay. + */ +case class SnapshotOptions( + snapshotVersion: Long, + endVersion: Long, + startKeyToNumValuesStateStoreCkptId: Option[String] = None, + startKeyWithIndexToValueStateStoreCkptId: Option[String] = None, + endKeyToNumValuesStateStoreCkptId: Option[String] = None, + endKeyWithIndexToValueStateStoreCkptId: Option[String] = None) { + def getKeyToNumValuesHandlerOpts(): HandlerSnapshotOptions = + HandlerSnapshotOptions( + snapshotVersion = snapshotVersion, + endVersion = endVersion, + startStateStoreCkptId = startKeyToNumValuesStateStoreCkptId, + endStateStoreCkptId = endKeyToNumValuesStateStoreCkptId) + + def getKeyWithIndexToValueHandlerOpts(): HandlerSnapshotOptions = + HandlerSnapshotOptions( + snapshotVersion = snapshotVersion, + endVersion = endVersion, + startStateStoreCkptId = startKeyWithIndexToValueStateStoreCkptId, + endStateStoreCkptId = endKeyWithIndexToValueStateStoreCkptId) + } + +/** Snapshot options specialized for a single state store handler. */ +case class HandlerSnapshotOptions( Review Comment: nit: private? ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala: ########## @@ -801,7 +801,11 @@ trait SupportsFineGrainedReplay { * @param endVersion checkpoint version to end with */ def replayStateFromSnapshot( - snapshotVersion: Long, endVersion: Long, readOnly: Boolean = false): StateStore + snapshotVersion: Long, + endVersion: Long, + readOnly: Boolean = false, + snapshotVersionStateStoreCkptId: Option[String] = None, Review Comment: ditto ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala: ########## @@ -721,10 +730,16 @@ class RocksDB( * @param snapshotVersion start checkpoint version * @param endVersion end version */ - private def replayFromCheckpoint(snapshotVersion: Long, endVersion: Long): Any = { + private def replayFromCheckpoint( + snapshotVersion: Long, + endVersion: Long, + snapshotVersionStateStoreCkptId: Option[String] = None, + endVersionStateStoreCkptId: Option[String] = None): Any = { Review Comment: ditto ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala: ########## @@ -813,8 +817,18 @@ trait SupportsFineGrainedReplay { * @param snapshotVersion checkpoint version of the snapshot to start with * @param endVersion checkpoint version to end with */ - def replayReadStateFromSnapshot(snapshotVersion: Long, endVersion: Long): ReadStateStore = { - new WrappedReadStateStore(replayStateFromSnapshot(snapshotVersion, endVersion, readOnly = true)) + def replayReadStateFromSnapshot( + snapshotVersion: Long, + endVersion: Long, + snapshotVersionStateStoreCkptId: Option[String] = None, + endVersionStateStoreCkptId: Option[String] = None): ReadStateStore = { Review Comment: ditto -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org