eason-yuchen-liu commented on code in PR #46944:
URL: https://github.com/apache/spark/pull/46944#discussion_r1638823883


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala:
##########
@@ -544,6 +595,38 @@ private[sql] class HDFSBackedStateStoreProvider extends 
StateStoreProvider with
     result
   }
 
+  private def loadMap(startVersion: Long, endVersion: Long): 
HDFSBackedStateStoreMap = {
+
+    val (result, elapsedMs) = Utils.timeTakenMs {
+      val startVersionMap =
+        synchronized { Option(loadedMaps.get(startVersion)) }
+              .orElse{
+                logWarning(
+                  log"The state for version ${MDC(LogKeys.FILE_VERSION, 
startVersion)} doesn't " +
+                  log"exist in loadedMaps. Reading snapshot file and delta 
files if needed..." +
+                  log"Note that this is normal for the first batch of starting 
query.")
+                readSnapshotFile(startVersion)}
+      if (startVersionMap.isEmpty) {
+        throw QueryExecutionErrors.failedToReadSnapshotFileNotExistsError(
+          snapshotFile(startVersion), toString(), null)
+      }
+      synchronized { putStateIntoStateCacheMap(startVersion, 
startVersionMap.get) }

Review Comment:
   For HDFS, it is hard because the common part is really small. But for 
RocksDB, there is room for refactoring. For example, this is PR is to test 
whether we can extract a common part of both `load` function. 
https://github.com/apache/spark/pull/46927



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala:
##########
@@ -300,35 +374,104 @@ class StateDataSourceSQLConfigSuite extends 
StateDataSourceTestBase {
   }
 }
 
-class HDFSBackedStateDataSourceReadSuite extends StateDataSourceReadSuite {
+class HDFSBackedStateDataSourceReadSuite
+  extends StateDataSourceReadSuite[HDFSBackedStateStoreProvider] {

Review Comment:
   After double thought, I think it is not necessary to use templated class 
here. I have reverted the change. And also make sure there is one place 
defining state store provider.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala:
##########
@@ -309,19 +309,51 @@ private[sql] class RocksDBStateStoreProvider
     }
   }
 
+  override def getStore(startVersion: Long, endVersion: Long): StateStore = {
+    try {
+      if (startVersion < 1) {
+        throw QueryExecutionErrors.unexpectedStateStoreVersion(startVersion)
+      }
+      if (endVersion < startVersion) {

Review Comment:
   This should be the correct one.



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala:
##########
@@ -194,6 +195,79 @@ class StateDataSourceNegativeTestSuite extends 
StateDataSourceTestBase {
       }
     }
   }
+
+  test("ERROR: snapshotStartBatchId specified to negative") {
+    withTempDir { tempDir =>
+      val exc = intercept[StateDataSourceInvalidOptionValueIsNegative] {
+        spark.read.format("statestore")
+          // trick to bypass getting the last committed batch before 
validating operator ID
+          .option(StateSourceOptions.BATCH_ID, 0)
+          .option(StateSourceOptions.SNAPSHOT_START_BATCH_ID, -1)
+          .load(tempDir.getAbsolutePath)
+      }
+      checkError(exc, "STDS_INVALID_OPTION_VALUE.IS_NEGATIVE", "42616",
+        Map("optionName" -> StateSourceOptions.SNAPSHOT_START_BATCH_ID))
+    }
+  }
+
+  test("ERROR: snapshotPartitionId specified to negative") {
+    withTempDir { tempDir =>
+      val exc = intercept[StateDataSourceInvalidOptionValueIsNegative] {
+        spark.read.format("statestore")
+          // trick to bypass getting the last committed batch before 
validating operator ID
+          .option(StateSourceOptions.BATCH_ID, 0)
+          .option(StateSourceOptions.SNAPSHOT_PARTITION_ID, -1)
+          .load(tempDir.getAbsolutePath)
+      }
+      checkError(exc, "STDS_INVALID_OPTION_VALUE.IS_NEGATIVE", "42616",
+        Map("optionName" -> StateSourceOptions.SNAPSHOT_PARTITION_ID))
+    }
+  }
+
+  test("ERROR: snapshotStartBatchId specified without snapshotPartitionId or 
vice versa") {
+    withTempDir { tempDir =>
+      val exc = intercept[StateDataSourceUnspecifiedRequiredOption] {
+        spark.read.format("statestore")
+          // trick to bypass getting the last committed batch before 
validating operator ID
+          .option(StateSourceOptions.BATCH_ID, 0)
+          .option(StateSourceOptions.SNAPSHOT_START_BATCH_ID, 0)
+          .load(tempDir.getAbsolutePath)
+      }
+      checkError(exc, "STDS_REQUIRED_OPTION_UNSPECIFIED", "42601",
+        Map("optionName" -> StateSourceOptions.SNAPSHOT_PARTITION_ID))
+    }
+
+    withTempDir { tempDir =>
+      val exc = intercept[StateDataSourceUnspecifiedRequiredOption] {
+        spark.read.format("statestore")
+          // trick to bypass getting the last committed batch before 
validating operator ID
+          .option(StateSourceOptions.BATCH_ID, 0)
+          .option(StateSourceOptions.SNAPSHOT_PARTITION_ID, 0)
+          .load(tempDir.getAbsolutePath)
+      }
+      checkError(exc, "STDS_REQUIRED_OPTION_UNSPECIFIED", "42601",
+        Map("optionName" -> StateSourceOptions.SNAPSHOT_START_BATCH_ID))
+    }
+  }
+
+  test("ERROR: snapshotStartBatchId is greater than snapshotEndBatchId") {
+    withTempDir { tempDir =>
+      val startBatchId = 1
+      val endBatchId = 0
+      val exc = intercept[StateDataSourceInvalidOptionValue] {
+        spark.read.format("statestore")
+          // trick to bypass getting the last committed batch before 
validating operator ID
+          .option(StateSourceOptions.BATCH_ID, 0)

Review Comment:
   Do you mean the comment or the code? The code cannot be removed, otherwise 
it will try to look for the max value of batch id available and an error will 
throw since nothing is there. Above tests use this a lot.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala:
##########
@@ -190,7 +195,28 @@ object StateSourceOptions extends DataSourceOptions {
       throw StateDataSourceErrors.conflictOptions(Seq(JOIN_SIDE, STORE_NAME))
     }
 
-    StateSourceOptions(resolvedCpLocation, batchId, operatorId, storeName, 
joinSide)
+    val snapshotStartBatchId = 
Option(options.get(SNAPSHOT_START_BATCH_ID)).map(_.toLong)
+    if (snapshotStartBatchId.exists(_ < 0)) {
+      throw 
StateDataSourceErrors.invalidOptionValueIsNegative(SNAPSHOT_START_BATCH_ID)
+    } else if (snapshotStartBatchId.exists(_ > batchId)) {
+      throw StateDataSourceErrors.invalidOptionValue(
+        SNAPSHOT_START_BATCH_ID, s"value should be less than or equal to 
$batchId")
+    }
+
+    val snapshotPartitionId = 
Option(options.get(SNAPSHOT_PARTITION_ID)).map(_.toInt)
+    if (snapshotPartitionId.exists(_ < 0)) {
+      throw 
StateDataSourceErrors.invalidOptionValueIsNegative(SNAPSHOT_PARTITION_ID)

Review Comment:
   Partition info is not available here, because this function needs to return 
instantly. Number of partitions is figured out during planning.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala:
##########
@@ -285,6 +315,27 @@ private[sql] class HDFSBackedStateStoreProvider extends 
StateStoreProvider with
     }
   }
 
+  private def getLoadedMapForStore(startVersion: Long, endVersion: Long):
+    HDFSBackedStateStoreMap = synchronized {
+    try {
+      if (startVersion < 1) {
+        throw QueryExecutionErrors.unexpectedStateStoreVersion(startVersion)
+      }
+      if (endVersion < startVersion || endVersion < 0) {

Review Comment:
   You are right. Should only be `endVersion < startVersion`.



##########
common/utils/src/main/resources/error/error-conditions.json:
##########
@@ -239,6 +244,11 @@
           "Error reading streaming state file of <fileToRead> does not exist. 
If the stream job is restarted with a new or updated state operation, please 
create a new checkpoint location or clear the existing checkpoint location."
         ]
       },
+      "SNAPSHOT_PARTITION_ID_NOT_FOUND" : {
+        "message" : [
+          "Partition id <snapshotPartitionId> not found for given state 
source."

Review Comment:
   Sure.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala:
##########
@@ -2159,6 +2159,18 @@ private[sql] object QueryExecutionErrors extends 
QueryErrorsBase with ExecutionE
       cause = null)
   }
 
+  def failedToReadSnapshotFileNotExistsError(
+      fileToRead: Path,
+      clazz: String,
+      f: Throwable): Throwable = {
+    new SparkException(
+      errorClass = 
"CANNOT_LOAD_STATE_STORE.CANNOT_READ_SNAPSHOT_FILE_NOT_EXISTS",

Review Comment:
   Sure.



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala:
##########
@@ -18,16 +18,17 @@ package org.apache.spark.sql.execution.datasources.v2.state
 
 import java.io.{File, FileWriter}
 
+import org.apache.hadoop.conf.Configuration
 import org.scalatest.Assertions
 
-import org.apache.spark.SparkUnsupportedOperationException
+import org.apache.spark.{SparkException, SparkUnsupportedOperationException}
 import org.apache.spark.io.CompressionCodec
 import org.apache.spark.sql.{AnalysisException, DataFrame, Encoders, Row}
 import org.apache.spark.sql.catalyst.expressions.{BoundReference, 
GenericInternalRow}
 import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
 import org.apache.spark.sql.execution.datasources.v2.state.utils.SchemaUtil
 import org.apache.spark.sql.execution.streaming.{CommitLog, MemoryStream, 
OffsetSeqLog}
-import 
org.apache.spark.sql.execution.streaming.state.{HDFSBackedStateStoreProvider, 
RocksDBStateStoreProvider, StateStore}
+import org.apache.spark.sql.execution.streaming.state._

Review Comment:
   No. The reason is I use three new classes in this pkg. I think it would be 
too long to include them all. What do you think?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala:
##########
@@ -369,6 +369,21 @@ trait StateStoreProvider {
   /** Return an instance of [[StateStore]] representing state data of the 
given version */
   def getStore(version: Long): StateStore
 
+  /**
+   * This is an optional method, used by snapshotStartBatchId option when 
reading state generated
+   * by join operation as data source.
+   * Return an instance of [[StateStore]] representing state data of the given 
version.
+   * The State Store will be constructed from the batch at startVersion, and 
applying delta files
+   * up to the endVersion. If there is no snapshot file of batch startVersion, 
an exception will
+   * be thrown.
+   *
+   * @param startVersion checkpoint version of the snapshot to start with
+   * @param endVersion checkpoint version to end with
+   */
+  def getStore(startVersion: Long, endVersion: Long): StateStore =
+    throw new SparkUnsupportedOperationException("getStore with startVersion 
and endVersion " +

Review Comment:
   It seems that we cannot, because to make this method optional, it has to 
have a default implementation, otherwise a build error will be thrown.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala:
##########
@@ -116,12 +116,15 @@ case class StateSourceOptions(
     batchId: Long,
     operatorId: Int,
     storeName: String,
-    joinSide: JoinSideValues) {
+    joinSide: JoinSideValues,
+    snapshotStartBatchId: Option[Long],
+    snapshotPartitionId: Option[Int]) {
   def stateCheckpointLocation: Path = new Path(resolvedCpLocation, 
DIR_NAME_STATE)
 
   override def toString: String = {
     s"StateSourceOptions(checkpointLocation=$resolvedCpLocation, 
batchId=$batchId, " +
-      s"operatorId=$operatorId, storeName=$storeName, joinSide=$joinSide)"
+      s"operatorId=$operatorId, storeName=$storeName, joinSide=$joinSide, " +
+      s"snapshotStartBatchId=$snapshotStartBatchId, 
snapshotPartitionId=$snapshotPartitionId)"

Review Comment:
   Sure.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to