anishshri-db commented on code in PR #46944:
URL: https://github.com/apache/spark/pull/46944#discussion_r1638644771
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala:
##########
@@ -116,12 +116,15 @@ case class StateSourceOptions(
batchId: Long,
operatorId: Int,
storeName: String,
- joinSide: JoinSideValues) {
+ joinSide: JoinSideValues,
+ snapshotStartBatchId: Option[Long],
+ snapshotPartitionId: Option[Int]) {
def stateCheckpointLocation: Path = new Path(resolvedCpLocation,
DIR_NAME_STATE)
override def toString: String = {
s"StateSourceOptions(checkpointLocation=$resolvedCpLocation,
batchId=$batchId, " +
- s"operatorId=$operatorId, storeName=$storeName, joinSide=$joinSide)"
+ s"operatorId=$operatorId, storeName=$storeName, joinSide=$joinSide, " +
+ s"snapshotStartBatchId=$snapshotStartBatchId,
snapshotPartitionId=$snapshotPartitionId)"
Review Comment:
maybe log these only if specified ?
##########
common/utils/src/main/resources/error/error-conditions.json:
##########
@@ -239,6 +244,11 @@
"Error reading streaming state file of <fileToRead> does not exist.
If the stream job is restarted with a new or updated state operation, please
create a new checkpoint location or clear the existing checkpoint location."
]
},
+ "SNAPSHOT_PARTITION_ID_NOT_FOUND" : {
+ "message" : [
+ "Partition id <snapshotPartitionId> not found for given state
source."
Review Comment:
"given state source" might not give enough info. Can we also add some
additional metadata such as checkpoint location, operator id etc ?
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala:
##########
@@ -190,7 +195,28 @@ object StateSourceOptions extends DataSourceOptions {
throw StateDataSourceErrors.conflictOptions(Seq(JOIN_SIDE, STORE_NAME))
}
- StateSourceOptions(resolvedCpLocation, batchId, operatorId, storeName,
joinSide)
+ val snapshotStartBatchId =
Option(options.get(SNAPSHOT_START_BATCH_ID)).map(_.toLong)
+ if (snapshotStartBatchId.exists(_ < 0)) {
+ throw
StateDataSourceErrors.invalidOptionValueIsNegative(SNAPSHOT_START_BATCH_ID)
+ } else if (snapshotStartBatchId.exists(_ > batchId)) {
+ throw StateDataSourceErrors.invalidOptionValue(
+ SNAPSHOT_START_BATCH_ID, s"value should be less than or equal to
$batchId")
+ }
+
+ val snapshotPartitionId =
Option(options.get(SNAPSHOT_PARTITION_ID)).map(_.toInt)
+ if (snapshotPartitionId.exists(_ < 0)) {
+ throw
StateDataSourceErrors.invalidOptionValueIsNegative(SNAPSHOT_PARTITION_ID)
Review Comment:
Can we also check for partition id being in the valid range ? or we don't
have the info about total num of partitions here ?
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala:
##########
@@ -261,6 +261,21 @@ private[sql] class HDFSBackedStateStoreProvider extends
StateStoreProvider with
new HDFSBackedStateStore(version, newMap)
}
+ /**
+ * Get the state store of endVersion for reading by applying delta files on
the snapshot of
+ * startVersion. If startVersion does not exist, an error will be thrown.
+ *
+ * @param startVersion checkpoint version of the snapshot to start with
+ * @param endVersion checkpoint version to end with
Review Comment:
Also specify return value passed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]