WweiL commented on code in PR #46944:
URL: https://github.com/apache/spark/pull/46944#discussion_r1637281535


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala:
##########
@@ -544,6 +595,38 @@ private[sql] class HDFSBackedStateStoreProvider extends 
StateStoreProvider with
     result
   }
 
+  private def loadMap(startVersion: Long, endVersion: Long): 
HDFSBackedStateStoreMap = {
+
+    val (result, elapsedMs) = Utils.timeTakenMs {
+      val startVersionMap =
+        synchronized { Option(loadedMaps.get(startVersion)) }
+              .orElse{
+                logWarning(
+                  log"The state for version ${MDC(LogKeys.FILE_VERSION, 
startVersion)} doesn't " +
+                  log"exist in loadedMaps. Reading snapshot file and delta 
files if needed..." +
+                  log"Note that this is normal for the first batch of starting 
query.")
+                readSnapshotFile(startVersion)}
+      if (startVersionMap.isEmpty) {
+        throw QueryExecutionErrors.failedToReadSnapshotFileNotExistsError(
+          snapshotFile(startVersion), toString(), null)
+      }
+      synchronized { putStateIntoStateCacheMap(startVersion, 
startVersionMap.get) }

Review Comment:
   is it possible to refactor this with existing loadMap fcn? or add helper 
function for shared logic



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala:
##########
@@ -93,7 +93,14 @@ class StatePartitionReader(
   }
 
   private lazy val store: ReadStateStore = {
-    provider.getReadStore(partition.sourceOptions.batchId + 1)
+    if (partition.sourceOptions.snapshotStartBatchId.isEmpty) {

Review Comment:
   nit: we can use the match syntax here
   
   ```
   partition.sourceOptions.snapshotStartBatchId match {
   case None=> ___
   case Some(batchId) => __
   }



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala:
##########
@@ -369,6 +369,21 @@ trait StateStoreProvider {
   /** Return an instance of [[StateStore]] representing state data of the 
given version */
   def getStore(version: Long): StateStore
 
+  /**
+   * This is an optional method, used by snapshotStartBatchId option when 
reading state generated
+   * by join operation as data source.
+   * Return an instance of [[StateStore]] representing state data of the given 
version.
+   * The State Store will be constructed from the batch at startVersion, and 
applying delta files
+   * up to the endVersion. If there is no snapshot file of batch startVersion, 
an exception will
+   * be thrown.
+   *
+   * @param startVersion checkpoint version of the snapshot to start with
+   * @param endVersion checkpoint version to end with
+   */
+  def getStore(startVersion: Long, endVersion: Long): StateStore =
+    throw new SparkUnsupportedOperationException("getStore with startVersion 
and endVersion " +

Review Comment:
   can we just put nothing here? like 
   ```
   def getStore(version: Long): StateStore
   ```



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala:
##########
@@ -309,19 +309,51 @@ private[sql] class RocksDBStateStoreProvider
     }
   }
 
+  override def getStore(startVersion: Long, endVersion: Long): StateStore = {
+    try {
+      if (startVersion < 1) {
+        throw QueryExecutionErrors.unexpectedStateStoreVersion(startVersion)
+      }
+      if (endVersion < startVersion) {

Review Comment:
   why this difference here and in HDFS?
   if (endVersion < startVersion || endVersion < 0) {
   
   can we document the reason here?



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala:
##########
@@ -300,35 +374,104 @@ class StateDataSourceSQLConfigSuite extends 
StateDataSourceTestBase {
   }
 }
 
-class HDFSBackedStateDataSourceReadSuite extends StateDataSourceReadSuite {
+class HDFSBackedStateDataSourceReadSuite
+  extends StateDataSourceReadSuite[HDFSBackedStateStoreProvider] {

Review Comment:
   I see the implementation below, can we load the actual statestore from spark 
conf here?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateScanBuilder.scala:
##########
@@ -81,9 +82,19 @@ class StateScan(
       assert((tail - head + 1) == partitionNums.length,
         s"No continuous partitions in state: 
${partitionNums.mkString("Array(", ", ", ")")}")
 
-      partitionNums.map {
-        pn => new StateStoreInputPartition(pn, queryId, sourceOptions)
-      }.toArray
+      if (sourceOptions.snapshotPartitionId.isEmpty) {
+        partitionNums.map {
+          pn => new StateStoreInputPartition(pn, queryId, sourceOptions)
+        }.toArray
+      }
+      else {

Review Comment:
   ditto match syntax



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala:
##########
@@ -285,6 +315,27 @@ private[sql] class HDFSBackedStateStoreProvider extends 
StateStoreProvider with
     }
   }
 
+  private def getLoadedMapForStore(startVersion: Long, endVersion: Long):
+    HDFSBackedStateStoreMap = synchronized {
+    try {
+      if (startVersion < 1) {
+        throw QueryExecutionErrors.unexpectedStateStoreVersion(startVersion)
+      }
+      if (endVersion < startVersion || endVersion < 0) {
+        throw QueryExecutionErrors.unexpectedStateStoreVersion(endVersion)
+      }
+
+      val newMap = HDFSBackedStateStoreMap.create(keySchema, numColsPrefixKey)
+      if (!(endVersion == 0)) {

Review Comment:
   endVersion != 0?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala:
##########
@@ -683,6 +766,12 @@ private[sql] class HDFSBackedStateStoreProvider extends 
StateStoreProvider with
     }
   }
 
+  /**
+  * try to read the snapshot file of the given version.
+  * If the snapshot file is not available, return None.
+   *
+   * @param version the version of the snapshot file
+  */

Review Comment:
   Thank you for adding the comment!



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala:
##########
@@ -285,6 +315,27 @@ private[sql] class HDFSBackedStateStoreProvider extends 
StateStoreProvider with
     }
   }
 
+  private def getLoadedMapForStore(startVersion: Long, endVersion: Long):
+    HDFSBackedStateStoreMap = synchronized {
+    try {
+      if (startVersion < 1) {
+        throw QueryExecutionErrors.unexpectedStateStoreVersion(startVersion)
+      }
+      if (endVersion < startVersion || endVersion < 0) {

Review Comment:
   if startVersion >= 1 then endversion cant < 0?



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala:
##########
@@ -18,16 +18,17 @@ package org.apache.spark.sql.execution.datasources.v2.state
 
 import java.io.{File, FileWriter}
 
+import org.apache.hadoop.conf.Configuration
 import org.scalatest.Assertions
 
-import org.apache.spark.SparkUnsupportedOperationException
+import org.apache.spark.{SparkException, SparkUnsupportedOperationException}
 import org.apache.spark.io.CompressionCodec
 import org.apache.spark.sql.{AnalysisException, DataFrame, Encoders, Row}
 import org.apache.spark.sql.catalyst.expressions.{BoundReference, 
GenericInternalRow}
 import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
 import org.apache.spark.sql.execution.datasources.v2.state.utils.SchemaUtil
 import org.apache.spark.sql.execution.streaming.{CommitLog, MemoryStream, 
OffsetSeqLog}
-import 
org.apache.spark.sql.execution.streaming.state.{HDFSBackedStateStoreProvider, 
RocksDBStateStoreProvider, StateStore}
+import org.apache.spark.sql.execution.streaming.state._

Review Comment:
   is this because these three are everything in that pkg?



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala:
##########
@@ -194,6 +195,79 @@ class StateDataSourceNegativeTestSuite extends 
StateDataSourceTestBase {
       }
     }
   }
+
+  test("ERROR: snapshotStartBatchId specified to negative") {
+    withTempDir { tempDir =>
+      val exc = intercept[StateDataSourceInvalidOptionValueIsNegative] {
+        spark.read.format("statestore")
+          // trick to bypass getting the last committed batch before 
validating operator ID
+          .option(StateSourceOptions.BATCH_ID, 0)
+          .option(StateSourceOptions.SNAPSHOT_START_BATCH_ID, -1)
+          .load(tempDir.getAbsolutePath)
+      }
+      checkError(exc, "STDS_INVALID_OPTION_VALUE.IS_NEGATIVE", "42616",
+        Map("optionName" -> StateSourceOptions.SNAPSHOT_START_BATCH_ID))
+    }
+  }
+
+  test("ERROR: snapshotPartitionId specified to negative") {
+    withTempDir { tempDir =>
+      val exc = intercept[StateDataSourceInvalidOptionValueIsNegative] {
+        spark.read.format("statestore")
+          // trick to bypass getting the last committed batch before 
validating operator ID
+          .option(StateSourceOptions.BATCH_ID, 0)
+          .option(StateSourceOptions.SNAPSHOT_PARTITION_ID, -1)
+          .load(tempDir.getAbsolutePath)
+      }
+      checkError(exc, "STDS_INVALID_OPTION_VALUE.IS_NEGATIVE", "42616",
+        Map("optionName" -> StateSourceOptions.SNAPSHOT_PARTITION_ID))
+    }
+  }
+
+  test("ERROR: snapshotStartBatchId specified without snapshotPartitionId or 
vice versa") {
+    withTempDir { tempDir =>
+      val exc = intercept[StateDataSourceUnspecifiedRequiredOption] {
+        spark.read.format("statestore")
+          // trick to bypass getting the last committed batch before 
validating operator ID
+          .option(StateSourceOptions.BATCH_ID, 0)
+          .option(StateSourceOptions.SNAPSHOT_START_BATCH_ID, 0)
+          .load(tempDir.getAbsolutePath)
+      }
+      checkError(exc, "STDS_REQUIRED_OPTION_UNSPECIFIED", "42601",
+        Map("optionName" -> StateSourceOptions.SNAPSHOT_PARTITION_ID))
+    }
+
+    withTempDir { tempDir =>
+      val exc = intercept[StateDataSourceUnspecifiedRequiredOption] {
+        spark.read.format("statestore")
+          // trick to bypass getting the last committed batch before 
validating operator ID
+          .option(StateSourceOptions.BATCH_ID, 0)
+          .option(StateSourceOptions.SNAPSHOT_PARTITION_ID, 0)
+          .load(tempDir.getAbsolutePath)
+      }
+      checkError(exc, "STDS_REQUIRED_OPTION_UNSPECIFIED", "42601",
+        Map("optionName" -> StateSourceOptions.SNAPSHOT_START_BATCH_ID))
+    }
+  }
+
+  test("ERROR: snapshotStartBatchId is greater than snapshotEndBatchId") {
+    withTempDir { tempDir =>
+      val startBatchId = 1
+      val endBatchId = 0
+      val exc = intercept[StateDataSourceInvalidOptionValue] {
+        spark.read.format("statestore")
+          // trick to bypass getting the last committed batch before 
validating operator ID
+          .option(StateSourceOptions.BATCH_ID, 0)

Review Comment:
   this should be deleted?



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala:
##########
@@ -300,35 +374,104 @@ class StateDataSourceSQLConfigSuite extends 
StateDataSourceTestBase {
   }
 }
 
-class HDFSBackedStateDataSourceReadSuite extends StateDataSourceReadSuite {
+class HDFSBackedStateDataSourceReadSuite
+  extends StateDataSourceReadSuite[HDFSBackedStateStoreProvider] {

Review Comment:
   why adding a  HDFSBackedStateStoreProvider here? spark conf is already set 
below?
   ```
   spark.conf.set(SQLConf.STATE_STORE_PROVIDER_CLASS.key,
         classOf[HDFSBackedStateStoreProvider].getName)
   ```



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala:
##########
@@ -300,35 +374,104 @@ class StateDataSourceSQLConfigSuite extends 
StateDataSourceTestBase {
   }
 }
 
-class HDFSBackedStateDataSourceReadSuite extends StateDataSourceReadSuite {
+class HDFSBackedStateDataSourceReadSuite
+  extends StateDataSourceReadSuite[HDFSBackedStateStoreProvider] {
   override def beforeAll(): Unit = {
     super.beforeAll()
     spark.conf.set(SQLConf.STATE_STORE_PROVIDER_CLASS.key,
       classOf[HDFSBackedStateStoreProvider].getName)
+    // make sure we have a snapshot for every two delta files
+    spark.conf.set(SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key, 1)

Review Comment:
   pls add more details on the difference between hdfs and rocksdb, that 
explains why setting this value is needed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to