eason-yuchen-liu commented on code in PR #46944:
URL: https://github.com/apache/spark/pull/46944#discussion_r1638823883
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala:
##########
@@ -544,6 +595,38 @@ private[sql] class HDFSBackedStateStoreProvider extends
StateStoreProvider with
result
}
+ private def loadMap(startVersion: Long, endVersion: Long):
HDFSBackedStateStoreMap = {
+
+ val (result, elapsedMs) = Utils.timeTakenMs {
+ val startVersionMap =
+ synchronized { Option(loadedMaps.get(startVersion)) }
+ .orElse{
+ logWarning(
+ log"The state for version ${MDC(LogKeys.FILE_VERSION,
startVersion)} doesn't " +
+ log"exist in loadedMaps. Reading snapshot file and delta
files if needed..." +
+ log"Note that this is normal for the first batch of starting
query.")
+ readSnapshotFile(startVersion)}
+ if (startVersionMap.isEmpty) {
+ throw QueryExecutionErrors.failedToReadSnapshotFileNotExistsError(
+ snapshotFile(startVersion), toString(), null)
+ }
+ synchronized { putStateIntoStateCacheMap(startVersion,
startVersionMap.get) }
Review Comment:
For HDFS, it is hard because the common part is really small. But for
RocksDB, there is room for refactoring. For example, this is PR is to test
whether we can extract a common part of both `load` function.
https://github.com/apache/spark/pull/46927
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala:
##########
@@ -300,35 +374,104 @@ class StateDataSourceSQLConfigSuite extends
StateDataSourceTestBase {
}
}
-class HDFSBackedStateDataSourceReadSuite extends StateDataSourceReadSuite {
+class HDFSBackedStateDataSourceReadSuite
+ extends StateDataSourceReadSuite[HDFSBackedStateStoreProvider] {
Review Comment:
After double thought, I think it is not necessary to use templated class
here. I have reverted the change. And also make sure there is one place
defining state store provider.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala:
##########
@@ -309,19 +309,51 @@ private[sql] class RocksDBStateStoreProvider
}
}
+ override def getStore(startVersion: Long, endVersion: Long): StateStore = {
+ try {
+ if (startVersion < 1) {
+ throw QueryExecutionErrors.unexpectedStateStoreVersion(startVersion)
+ }
+ if (endVersion < startVersion) {
Review Comment:
This should be the correct one.
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala:
##########
@@ -194,6 +195,79 @@ class StateDataSourceNegativeTestSuite extends
StateDataSourceTestBase {
}
}
}
+
+ test("ERROR: snapshotStartBatchId specified to negative") {
+ withTempDir { tempDir =>
+ val exc = intercept[StateDataSourceInvalidOptionValueIsNegative] {
+ spark.read.format("statestore")
+ // trick to bypass getting the last committed batch before
validating operator ID
+ .option(StateSourceOptions.BATCH_ID, 0)
+ .option(StateSourceOptions.SNAPSHOT_START_BATCH_ID, -1)
+ .load(tempDir.getAbsolutePath)
+ }
+ checkError(exc, "STDS_INVALID_OPTION_VALUE.IS_NEGATIVE", "42616",
+ Map("optionName" -> StateSourceOptions.SNAPSHOT_START_BATCH_ID))
+ }
+ }
+
+ test("ERROR: snapshotPartitionId specified to negative") {
+ withTempDir { tempDir =>
+ val exc = intercept[StateDataSourceInvalidOptionValueIsNegative] {
+ spark.read.format("statestore")
+ // trick to bypass getting the last committed batch before
validating operator ID
+ .option(StateSourceOptions.BATCH_ID, 0)
+ .option(StateSourceOptions.SNAPSHOT_PARTITION_ID, -1)
+ .load(tempDir.getAbsolutePath)
+ }
+ checkError(exc, "STDS_INVALID_OPTION_VALUE.IS_NEGATIVE", "42616",
+ Map("optionName" -> StateSourceOptions.SNAPSHOT_PARTITION_ID))
+ }
+ }
+
+ test("ERROR: snapshotStartBatchId specified without snapshotPartitionId or
vice versa") {
+ withTempDir { tempDir =>
+ val exc = intercept[StateDataSourceUnspecifiedRequiredOption] {
+ spark.read.format("statestore")
+ // trick to bypass getting the last committed batch before
validating operator ID
+ .option(StateSourceOptions.BATCH_ID, 0)
+ .option(StateSourceOptions.SNAPSHOT_START_BATCH_ID, 0)
+ .load(tempDir.getAbsolutePath)
+ }
+ checkError(exc, "STDS_REQUIRED_OPTION_UNSPECIFIED", "42601",
+ Map("optionName" -> StateSourceOptions.SNAPSHOT_PARTITION_ID))
+ }
+
+ withTempDir { tempDir =>
+ val exc = intercept[StateDataSourceUnspecifiedRequiredOption] {
+ spark.read.format("statestore")
+ // trick to bypass getting the last committed batch before
validating operator ID
+ .option(StateSourceOptions.BATCH_ID, 0)
+ .option(StateSourceOptions.SNAPSHOT_PARTITION_ID, 0)
+ .load(tempDir.getAbsolutePath)
+ }
+ checkError(exc, "STDS_REQUIRED_OPTION_UNSPECIFIED", "42601",
+ Map("optionName" -> StateSourceOptions.SNAPSHOT_START_BATCH_ID))
+ }
+ }
+
+ test("ERROR: snapshotStartBatchId is greater than snapshotEndBatchId") {
+ withTempDir { tempDir =>
+ val startBatchId = 1
+ val endBatchId = 0
+ val exc = intercept[StateDataSourceInvalidOptionValue] {
+ spark.read.format("statestore")
+ // trick to bypass getting the last committed batch before
validating operator ID
+ .option(StateSourceOptions.BATCH_ID, 0)
Review Comment:
Do you mean the comment or the code? The code cannot be removed, otherwise
it will try to look for the max value of batch id available and an error will
throw since nothing is there. Above tests use this a lot.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala:
##########
@@ -190,7 +195,28 @@ object StateSourceOptions extends DataSourceOptions {
throw StateDataSourceErrors.conflictOptions(Seq(JOIN_SIDE, STORE_NAME))
}
- StateSourceOptions(resolvedCpLocation, batchId, operatorId, storeName,
joinSide)
+ val snapshotStartBatchId =
Option(options.get(SNAPSHOT_START_BATCH_ID)).map(_.toLong)
+ if (snapshotStartBatchId.exists(_ < 0)) {
+ throw
StateDataSourceErrors.invalidOptionValueIsNegative(SNAPSHOT_START_BATCH_ID)
+ } else if (snapshotStartBatchId.exists(_ > batchId)) {
+ throw StateDataSourceErrors.invalidOptionValue(
+ SNAPSHOT_START_BATCH_ID, s"value should be less than or equal to
$batchId")
+ }
+
+ val snapshotPartitionId =
Option(options.get(SNAPSHOT_PARTITION_ID)).map(_.toInt)
+ if (snapshotPartitionId.exists(_ < 0)) {
+ throw
StateDataSourceErrors.invalidOptionValueIsNegative(SNAPSHOT_PARTITION_ID)
Review Comment:
Partition info is not available here, because this function needs to return
instantly. Number of partitions is figured out during planning.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala:
##########
@@ -285,6 +315,27 @@ private[sql] class HDFSBackedStateStoreProvider extends
StateStoreProvider with
}
}
+ private def getLoadedMapForStore(startVersion: Long, endVersion: Long):
+ HDFSBackedStateStoreMap = synchronized {
+ try {
+ if (startVersion < 1) {
+ throw QueryExecutionErrors.unexpectedStateStoreVersion(startVersion)
+ }
+ if (endVersion < startVersion || endVersion < 0) {
Review Comment:
You are right. Should only be `endVersion < startVersion`.
##########
common/utils/src/main/resources/error/error-conditions.json:
##########
@@ -239,6 +244,11 @@
"Error reading streaming state file of <fileToRead> does not exist.
If the stream job is restarted with a new or updated state operation, please
create a new checkpoint location or clear the existing checkpoint location."
]
},
+ "SNAPSHOT_PARTITION_ID_NOT_FOUND" : {
+ "message" : [
+ "Partition id <snapshotPartitionId> not found for given state
source."
Review Comment:
Sure.
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala:
##########
@@ -2159,6 +2159,18 @@ private[sql] object QueryExecutionErrors extends
QueryErrorsBase with ExecutionE
cause = null)
}
+ def failedToReadSnapshotFileNotExistsError(
+ fileToRead: Path,
+ clazz: String,
+ f: Throwable): Throwable = {
+ new SparkException(
+ errorClass =
"CANNOT_LOAD_STATE_STORE.CANNOT_READ_SNAPSHOT_FILE_NOT_EXISTS",
Review Comment:
Sure.
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala:
##########
@@ -18,16 +18,17 @@ package org.apache.spark.sql.execution.datasources.v2.state
import java.io.{File, FileWriter}
+import org.apache.hadoop.conf.Configuration
import org.scalatest.Assertions
-import org.apache.spark.SparkUnsupportedOperationException
+import org.apache.spark.{SparkException, SparkUnsupportedOperationException}
import org.apache.spark.io.CompressionCodec
import org.apache.spark.sql.{AnalysisException, DataFrame, Encoders, Row}
import org.apache.spark.sql.catalyst.expressions.{BoundReference,
GenericInternalRow}
import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
import org.apache.spark.sql.execution.datasources.v2.state.utils.SchemaUtil
import org.apache.spark.sql.execution.streaming.{CommitLog, MemoryStream,
OffsetSeqLog}
-import
org.apache.spark.sql.execution.streaming.state.{HDFSBackedStateStoreProvider,
RocksDBStateStoreProvider, StateStore}
+import org.apache.spark.sql.execution.streaming.state._
Review Comment:
No. The reason is I use three new classes in this pkg. I think it would be
too long to include them all. What do you think?
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala:
##########
@@ -369,6 +369,21 @@ trait StateStoreProvider {
/** Return an instance of [[StateStore]] representing state data of the
given version */
def getStore(version: Long): StateStore
+ /**
+ * This is an optional method, used by snapshotStartBatchId option when
reading state generated
+ * by join operation as data source.
+ * Return an instance of [[StateStore]] representing state data of the given
version.
+ * The State Store will be constructed from the batch at startVersion, and
applying delta files
+ * up to the endVersion. If there is no snapshot file of batch startVersion,
an exception will
+ * be thrown.
+ *
+ * @param startVersion checkpoint version of the snapshot to start with
+ * @param endVersion checkpoint version to end with
+ */
+ def getStore(startVersion: Long, endVersion: Long): StateStore =
+ throw new SparkUnsupportedOperationException("getStore with startVersion
and endVersion " +
Review Comment:
It seems that we cannot, because to make this method optional, it has to
have a default implementation, otherwise a build error will be thrown.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala:
##########
@@ -116,12 +116,15 @@ case class StateSourceOptions(
batchId: Long,
operatorId: Int,
storeName: String,
- joinSide: JoinSideValues) {
+ joinSide: JoinSideValues,
+ snapshotStartBatchId: Option[Long],
+ snapshotPartitionId: Option[Int]) {
def stateCheckpointLocation: Path = new Path(resolvedCpLocation,
DIR_NAME_STATE)
override def toString: String = {
s"StateSourceOptions(checkpointLocation=$resolvedCpLocation,
batchId=$batchId, " +
- s"operatorId=$operatorId, storeName=$storeName, joinSide=$joinSide)"
+ s"operatorId=$operatorId, storeName=$storeName, joinSide=$joinSide, " +
+ s"snapshotStartBatchId=$snapshotStartBatchId,
snapshotPartitionId=$snapshotPartitionId)"
Review Comment:
Sure.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]