WweiL commented on code in PR #46944:
URL: https://github.com/apache/spark/pull/46944#discussion_r1637281535
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala:
##########
@@ -544,6 +595,38 @@ private[sql] class HDFSBackedStateStoreProvider extends
StateStoreProvider with
result
}
+ private def loadMap(startVersion: Long, endVersion: Long):
HDFSBackedStateStoreMap = {
+
+ val (result, elapsedMs) = Utils.timeTakenMs {
+ val startVersionMap =
+ synchronized { Option(loadedMaps.get(startVersion)) }
+ .orElse{
+ logWarning(
+ log"The state for version ${MDC(LogKeys.FILE_VERSION,
startVersion)} doesn't " +
+ log"exist in loadedMaps. Reading snapshot file and delta
files if needed..." +
+ log"Note that this is normal for the first batch of starting
query.")
+ readSnapshotFile(startVersion)}
+ if (startVersionMap.isEmpty) {
+ throw QueryExecutionErrors.failedToReadSnapshotFileNotExistsError(
+ snapshotFile(startVersion), toString(), null)
+ }
+ synchronized { putStateIntoStateCacheMap(startVersion,
startVersionMap.get) }
Review Comment:
is it possible to refactor this with existing loadMap fcn? or add helper
function for shared logic
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala:
##########
@@ -93,7 +93,14 @@ class StatePartitionReader(
}
private lazy val store: ReadStateStore = {
- provider.getReadStore(partition.sourceOptions.batchId + 1)
+ if (partition.sourceOptions.snapshotStartBatchId.isEmpty) {
Review Comment:
nit: we can use the match syntax here
```
partition.sourceOptions.snapshotStartBatchId match {
case None=> ___
case Some(batchId) => __
}
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala:
##########
@@ -369,6 +369,21 @@ trait StateStoreProvider {
/** Return an instance of [[StateStore]] representing state data of the
given version */
def getStore(version: Long): StateStore
+ /**
+ * This is an optional method, used by snapshotStartBatchId option when
reading state generated
+ * by join operation as data source.
+ * Return an instance of [[StateStore]] representing state data of the given
version.
+ * The State Store will be constructed from the batch at startVersion, and
applying delta files
+ * up to the endVersion. If there is no snapshot file of batch startVersion,
an exception will
+ * be thrown.
+ *
+ * @param startVersion checkpoint version of the snapshot to start with
+ * @param endVersion checkpoint version to end with
+ */
+ def getStore(startVersion: Long, endVersion: Long): StateStore =
+ throw new SparkUnsupportedOperationException("getStore with startVersion
and endVersion " +
Review Comment:
can we just put nothing here? like
```
def getStore(version: Long): StateStore
```
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala:
##########
@@ -309,19 +309,51 @@ private[sql] class RocksDBStateStoreProvider
}
}
+ override def getStore(startVersion: Long, endVersion: Long): StateStore = {
+ try {
+ if (startVersion < 1) {
+ throw QueryExecutionErrors.unexpectedStateStoreVersion(startVersion)
+ }
+ if (endVersion < startVersion) {
Review Comment:
why this difference here and in HDFS?
if (endVersion < startVersion || endVersion < 0) {
can we document the reason here?
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala:
##########
@@ -300,35 +374,104 @@ class StateDataSourceSQLConfigSuite extends
StateDataSourceTestBase {
}
}
-class HDFSBackedStateDataSourceReadSuite extends StateDataSourceReadSuite {
+class HDFSBackedStateDataSourceReadSuite
+ extends StateDataSourceReadSuite[HDFSBackedStateStoreProvider] {
Review Comment:
I see the implementation below, can we load the actual statestore from spark
conf here?
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateScanBuilder.scala:
##########
@@ -81,9 +82,19 @@ class StateScan(
assert((tail - head + 1) == partitionNums.length,
s"No continuous partitions in state:
${partitionNums.mkString("Array(", ", ", ")")}")
- partitionNums.map {
- pn => new StateStoreInputPartition(pn, queryId, sourceOptions)
- }.toArray
+ if (sourceOptions.snapshotPartitionId.isEmpty) {
+ partitionNums.map {
+ pn => new StateStoreInputPartition(pn, queryId, sourceOptions)
+ }.toArray
+ }
+ else {
Review Comment:
ditto match syntax
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala:
##########
@@ -285,6 +315,27 @@ private[sql] class HDFSBackedStateStoreProvider extends
StateStoreProvider with
}
}
+ private def getLoadedMapForStore(startVersion: Long, endVersion: Long):
+ HDFSBackedStateStoreMap = synchronized {
+ try {
+ if (startVersion < 1) {
+ throw QueryExecutionErrors.unexpectedStateStoreVersion(startVersion)
+ }
+ if (endVersion < startVersion || endVersion < 0) {
+ throw QueryExecutionErrors.unexpectedStateStoreVersion(endVersion)
+ }
+
+ val newMap = HDFSBackedStateStoreMap.create(keySchema, numColsPrefixKey)
+ if (!(endVersion == 0)) {
Review Comment:
endVersion != 0?
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala:
##########
@@ -683,6 +766,12 @@ private[sql] class HDFSBackedStateStoreProvider extends
StateStoreProvider with
}
}
+ /**
+ * try to read the snapshot file of the given version.
+ * If the snapshot file is not available, return None.
+ *
+ * @param version the version of the snapshot file
+ */
Review Comment:
Thank you for adding the comment!
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala:
##########
@@ -285,6 +315,27 @@ private[sql] class HDFSBackedStateStoreProvider extends
StateStoreProvider with
}
}
+ private def getLoadedMapForStore(startVersion: Long, endVersion: Long):
+ HDFSBackedStateStoreMap = synchronized {
+ try {
+ if (startVersion < 1) {
+ throw QueryExecutionErrors.unexpectedStateStoreVersion(startVersion)
+ }
+ if (endVersion < startVersion || endVersion < 0) {
Review Comment:
if startVersion >= 1 then endversion cant < 0?
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala:
##########
@@ -18,16 +18,17 @@ package org.apache.spark.sql.execution.datasources.v2.state
import java.io.{File, FileWriter}
+import org.apache.hadoop.conf.Configuration
import org.scalatest.Assertions
-import org.apache.spark.SparkUnsupportedOperationException
+import org.apache.spark.{SparkException, SparkUnsupportedOperationException}
import org.apache.spark.io.CompressionCodec
import org.apache.spark.sql.{AnalysisException, DataFrame, Encoders, Row}
import org.apache.spark.sql.catalyst.expressions.{BoundReference,
GenericInternalRow}
import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
import org.apache.spark.sql.execution.datasources.v2.state.utils.SchemaUtil
import org.apache.spark.sql.execution.streaming.{CommitLog, MemoryStream,
OffsetSeqLog}
-import
org.apache.spark.sql.execution.streaming.state.{HDFSBackedStateStoreProvider,
RocksDBStateStoreProvider, StateStore}
+import org.apache.spark.sql.execution.streaming.state._
Review Comment:
is this because these three are everything in that pkg?
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala:
##########
@@ -194,6 +195,79 @@ class StateDataSourceNegativeTestSuite extends
StateDataSourceTestBase {
}
}
}
+
+ test("ERROR: snapshotStartBatchId specified to negative") {
+ withTempDir { tempDir =>
+ val exc = intercept[StateDataSourceInvalidOptionValueIsNegative] {
+ spark.read.format("statestore")
+ // trick to bypass getting the last committed batch before
validating operator ID
+ .option(StateSourceOptions.BATCH_ID, 0)
+ .option(StateSourceOptions.SNAPSHOT_START_BATCH_ID, -1)
+ .load(tempDir.getAbsolutePath)
+ }
+ checkError(exc, "STDS_INVALID_OPTION_VALUE.IS_NEGATIVE", "42616",
+ Map("optionName" -> StateSourceOptions.SNAPSHOT_START_BATCH_ID))
+ }
+ }
+
+ test("ERROR: snapshotPartitionId specified to negative") {
+ withTempDir { tempDir =>
+ val exc = intercept[StateDataSourceInvalidOptionValueIsNegative] {
+ spark.read.format("statestore")
+ // trick to bypass getting the last committed batch before
validating operator ID
+ .option(StateSourceOptions.BATCH_ID, 0)
+ .option(StateSourceOptions.SNAPSHOT_PARTITION_ID, -1)
+ .load(tempDir.getAbsolutePath)
+ }
+ checkError(exc, "STDS_INVALID_OPTION_VALUE.IS_NEGATIVE", "42616",
+ Map("optionName" -> StateSourceOptions.SNAPSHOT_PARTITION_ID))
+ }
+ }
+
+ test("ERROR: snapshotStartBatchId specified without snapshotPartitionId or
vice versa") {
+ withTempDir { tempDir =>
+ val exc = intercept[StateDataSourceUnspecifiedRequiredOption] {
+ spark.read.format("statestore")
+ // trick to bypass getting the last committed batch before
validating operator ID
+ .option(StateSourceOptions.BATCH_ID, 0)
+ .option(StateSourceOptions.SNAPSHOT_START_BATCH_ID, 0)
+ .load(tempDir.getAbsolutePath)
+ }
+ checkError(exc, "STDS_REQUIRED_OPTION_UNSPECIFIED", "42601",
+ Map("optionName" -> StateSourceOptions.SNAPSHOT_PARTITION_ID))
+ }
+
+ withTempDir { tempDir =>
+ val exc = intercept[StateDataSourceUnspecifiedRequiredOption] {
+ spark.read.format("statestore")
+ // trick to bypass getting the last committed batch before
validating operator ID
+ .option(StateSourceOptions.BATCH_ID, 0)
+ .option(StateSourceOptions.SNAPSHOT_PARTITION_ID, 0)
+ .load(tempDir.getAbsolutePath)
+ }
+ checkError(exc, "STDS_REQUIRED_OPTION_UNSPECIFIED", "42601",
+ Map("optionName" -> StateSourceOptions.SNAPSHOT_START_BATCH_ID))
+ }
+ }
+
+ test("ERROR: snapshotStartBatchId is greater than snapshotEndBatchId") {
+ withTempDir { tempDir =>
+ val startBatchId = 1
+ val endBatchId = 0
+ val exc = intercept[StateDataSourceInvalidOptionValue] {
+ spark.read.format("statestore")
+ // trick to bypass getting the last committed batch before
validating operator ID
+ .option(StateSourceOptions.BATCH_ID, 0)
Review Comment:
this should be deleted?
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala:
##########
@@ -300,35 +374,104 @@ class StateDataSourceSQLConfigSuite extends
StateDataSourceTestBase {
}
}
-class HDFSBackedStateDataSourceReadSuite extends StateDataSourceReadSuite {
+class HDFSBackedStateDataSourceReadSuite
+ extends StateDataSourceReadSuite[HDFSBackedStateStoreProvider] {
Review Comment:
why adding a HDFSBackedStateStoreProvider here? spark conf is already set
below?
```
spark.conf.set(SQLConf.STATE_STORE_PROVIDER_CLASS.key,
classOf[HDFSBackedStateStoreProvider].getName)
```
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala:
##########
@@ -300,35 +374,104 @@ class StateDataSourceSQLConfigSuite extends
StateDataSourceTestBase {
}
}
-class HDFSBackedStateDataSourceReadSuite extends StateDataSourceReadSuite {
+class HDFSBackedStateDataSourceReadSuite
+ extends StateDataSourceReadSuite[HDFSBackedStateStoreProvider] {
override def beforeAll(): Unit = {
super.beforeAll()
spark.conf.set(SQLConf.STATE_STORE_PROVIDER_CLASS.key,
classOf[HDFSBackedStateStoreProvider].getName)
+ // make sure we have a snapshot for every two delta files
+ spark.conf.set(SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key, 1)
Review Comment:
pls add more details on the difference between hdfs and rocksdb, that
explains why setting this value is needed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]