This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new bde87d66d0d [SPARK-44252][SS] Define a new error class and apply for the case where loading state from DFS fails bde87d66d0d is described below commit bde87d66d0d23d35ed82d412dac602c105b959a4 Author: Lucy Yao <lucy....@databricks.com> AuthorDate: Fri Jul 21 08:51:32 2023 +0900 [SPARK-44252][SS] Define a new error class and apply for the case where loading state from DFS fails ### What changes were proposed in this pull request? Migrated errors from the StateStoreProvider.getStore() and StateStoreProvider.getReadStore() entry points to the new error class framework. The ticket for this issue is: https://issues.apache.org/jira/browse/SPARK-44252. ### Why are the changes needed? Essentially, we are creating a wrapping loading error in getStore and getReadStore to give better error context. ### Does this PR introduce _any_ user-facing change? Yes ### How was this patch tested? Wrote and updated tests that ensures the new errors are thrown as expected. Closes #41705 from lucyyao-db/SC-132521-OSS. Authored-by: Lucy Yao <lucy....@databricks.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- .../src/main/resources/error/error-classes.json | 69 +++++++++++-- ...nditions-cannot-load-state-store-error-class.md | 69 +++++++++++++ docs/sql-error-conditions.md | 8 ++ .../spark/sql/errors/QueryExecutionErrors.scala | 107 +++++++++++++++++++-- .../state/HDFSBackedStateStoreProvider.scala | 31 +++--- .../sql/execution/streaming/state/RocksDB.scala | 8 +- .../streaming/state/RocksDBFileManager.scala | 9 +- .../state/RocksDBStateStoreProvider.scala | 27 ++++-- .../sql/execution/streaming/state/StateStore.scala | 9 +- .../streaming/state/StateStoreChangelog.scala | 6 +- .../execution/streaming/state/RocksDBSuite.scala | 77 +++++++++++++-- .../streaming/state/StateStoreSuite.scala | 63 ++++++++++-- 12 files changed, 416 insertions(+), 67 deletions(-) diff --git a/common/utils/src/main/resources/error/error-classes.json b/common/utils/src/main/resources/error/error-classes.json index 4debf3da0b8..7913a9b9241 100644 --- a/common/utils/src/main/resources/error/error-classes.json +++ b/common/utils/src/main/resources/error/error-classes.json @@ -143,6 +143,65 @@ "Could not load Protobuf class with name <protobufClassName>. <explanation>." ] }, + "CANNOT_LOAD_STATE_STORE" : { + "message" : [ + "An error occurred during loading state." + ], + "subClass" : { + "CANNOT_READ_CHECKPOINT" : { + "message" : [ + "Cannot read RocksDB checkpoint metadata. Expected <expectedVersion>, but found <actualVersion>." + ] + }, + "CANNOT_READ_DELTA_FILE_KEY_SIZE" : { + "message" : [ + "Error reading delta file <fileToRead> of <clazz>: key size cannot be <keySize>." + ] + }, + "CANNOT_READ_DELTA_FILE_NOT_EXISTS" : { + "message" : [ + "Error reading delta file <fileToRead> of <clazz>: <fileToRead> does not exist." + ] + }, + "CANNOT_READ_SNAPSHOT_FILE_KEY_SIZE" : { + "message" : [ + "Error reading snapshot file <fileToRead> of <clazz>: key size cannot be <keySize>." + ] + }, + "CANNOT_READ_SNAPSHOT_FILE_VALUE_SIZE" : { + "message" : [ + "Error reading snapshot file <fileToRead> of <clazz>: value size cannot be <valueSize>." + ] + }, + "CANNOT_READ_STREAMING_STATE_FILE" : { + "message" : [ + "Error reading streaming state file of <fileToRead> does not exist. If the stream job is restarted with a new or updated state operation, please create a new checkpoint location or clear the existing checkpoint location." + ] + }, + "UNCATEGORIZED" : { + "message" : [ + "" + ] + }, + "UNEXPECTED_FILE_SIZE" : { + "message" : [ + "Copied <dfsFile> to <localFile>, expected <expectedSize> bytes, found <localFileSize> bytes." + ] + }, + "UNEXPECTED_VERSION" : { + "message" : [ + "Version cannot be <version> because it is less than 0." + ] + }, + "UNRELEASED_THREAD_ERROR" : { + "message" : [ + "<loggingId>: RocksDB instance could not be acquired by <newAcquiredThreadInfo> as it was not released by <acquiredThreadInfo> after <timeWaitedMs> ms.", + "Thread holding the lock has trace: <stackTraceOutput>" + ] + } + }, + "sqlState" : "58030" + }, "CANNOT_MERGE_INCOMPATIBLE_DATA_TYPE" : { "message" : [ "Failed to merge incompatible data types <left> and <right>. Please check the data types of the columns being merged and ensure that they are compatible. If necessary, consider casting the columns to compatible data types before attempting the merge." @@ -5749,16 +5808,6 @@ "Foreach writer has been aborted due to a task failure." ] }, - "_LEGACY_ERROR_TEMP_2258" : { - "message" : [ - "Error reading delta file <fileToRead> of <clazz>: key size cannot be <keySize>." - ] - }, - "_LEGACY_ERROR_TEMP_2259" : { - "message" : [ - "Error reading snapshot file <fileToRead> of <clazz>: <message>" - ] - }, "_LEGACY_ERROR_TEMP_2260" : { "message" : [ "Cannot purge as it might break internal state." diff --git a/docs/sql-error-conditions-cannot-load-state-store-error-class.md b/docs/sql-error-conditions-cannot-load-state-store-error-class.md new file mode 100644 index 00000000000..50450be689f --- /dev/null +++ b/docs/sql-error-conditions-cannot-load-state-store-error-class.md @@ -0,0 +1,69 @@ +--- +layout: global +title: CANNOT_LOAD_STATE_STORE error class +displayTitle: CANNOT_LOAD_STATE_STORE error class +license: | + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--- + +SQLSTATE: 58030 + +An error occurred during loading state. + +This error class has the following derived error classes: + +## CANNOT_READ_CHECKPOINT + +Cannot read RocksDB checkpoint metadata. Expected `<expectedVersion>`, but found `<actualVersion>`. + +## CANNOT_READ_DELTA_FILE_KEY_SIZE + +Error reading delta file `<fileToRead>` of `<clazz>`: key size cannot be `<keySize>`. + +## CANNOT_READ_DELTA_FILE_NOT_EXISTS + +Error reading delta file `<fileToRead>` of `<clazz>`: `<fileToRead>` does not exist. + +## CANNOT_READ_SNAPSHOT_FILE_KEY_SIZE + +Error reading snapshot file `<fileToRead>` of `<clazz>`: key size cannot be `<keySize>`. + +## CANNOT_READ_SNAPSHOT_FILE_VALUE_SIZE + +Error reading snapshot file `<fileToRead>` of `<clazz>`: value size cannot be `<valueSize>`. + +## CANNOT_READ_STREAMING_STATE_FILE + +Error reading streaming state file of `<fileToRead>` does not exist. If the stream job is restarted with a new or updated state operation, please create a new checkpoint location or clear the existing checkpoint location. + +## UNCATEGORIZED + + + +## UNEXPECTED_FILE_SIZE + +Copied `<dfsFile>` to `<localFile>`, expected `<expectedSize>` bytes, found `<localFileSize>` bytes. + +## UNEXPECTED_VERSION + +Version cannot be `<version>` because it is less than 0. + +## UNRELEASED_THREAD_ERROR + +`<loggingId>`: RocksDB instance could not be acquired by `<newAcquiredThreadInfo>` as it was not released by `<acquiredThreadInfo>` after `<timeWaitedMs>` ms. +Thread holding the lock has trace: `<stackTraceOutput>` + + diff --git a/docs/sql-error-conditions.md b/docs/sql-error-conditions.md index 00fe6d75f53..cd04c414df3 100644 --- a/docs/sql-error-conditions.md +++ b/docs/sql-error-conditions.md @@ -171,6 +171,14 @@ SQLSTATE: none assigned Could not load Protobuf class with name `<protobufClassName>`. `<explanation>`. +### [CANNOT_LOAD_STATE_STORE](sql-error-conditions-cannot-load-state-store-error-class.html) + +SQLSTATE: 58030 + +An error occurred during loading state. + +For more details see [CANNOT_LOAD_STATE_STORE](sql-error-conditions-cannot-load-state-store-error-class.html) + ### CANNOT_MERGE_INCOMPATIBLE_DATA_TYPE [SQLSTATE: 42825](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala index 433d23eaf3f..983648ff673 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.errors -import java.io.{FileNotFoundException, IOException} +import java.io.{File, FileNotFoundException, IOException} import java.lang.reflect.InvocationTargetException import java.net.{URISyntaxException, URL} import java.time.{DateTimeException, LocalDate} @@ -2442,26 +2442,64 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase { s"but it's ${endSeconds.toString} now.") } - def failedToReadDeltaFileError(fileToRead: Path, clazz: String, keySize: Int): Throwable = { + def failedToReadDeltaFileKeySizeError( + fileToRead: Path, + clazz: String, + keySize: Int): Throwable = { new SparkException( - errorClass = "_LEGACY_ERROR_TEMP_2258", + errorClass = "CANNOT_LOAD_STATE_STORE.CANNOT_READ_DELTA_FILE_KEY_SIZE", messageParameters = Map( "fileToRead" -> fileToRead.toString(), "clazz" -> clazz, - "keySize" -> keySize.toString()), + "keySize" -> keySize.toString), cause = null) } - def failedToReadSnapshotFileError(fileToRead: Path, clazz: String, message: String): Throwable = { + def failedToReadDeltaFileNotExistsError( + fileToRead: Path, + clazz: String, + f: Throwable): Throwable = { new SparkException( - errorClass = "_LEGACY_ERROR_TEMP_2259", + errorClass = "CANNOT_LOAD_STATE_STORE.CANNOT_READ_DELTA_FILE_NOT_EXISTS", + messageParameters = Map( + "fileToRead" -> fileToRead.toString(), + "clazz" -> clazz), + cause = f) + } + + def failedToReadSnapshotFileKeySizeError( + fileToRead: Path, + clazz: String, + keySize: Int): Throwable = { + new SparkException( + errorClass = "CANNOT_LOAD_STATE_STORE.CANNOT_READ_SNAPSHOT_FILE_KEY_SIZE", messageParameters = Map( "fileToRead" -> fileToRead.toString(), "clazz" -> clazz, - "message" -> message), + "keySize" -> keySize.toString), cause = null) } + def failedToReadSnapshotFileValueSizeError( + fileToRead: Path, + clazz: String, + valueSize: Int): Throwable = { + new SparkException( + errorClass = "CANNOT_LOAD_STATE_STORE.CANNOT_READ_SNAPSHOT_FILE_VALUE_SIZE", + messageParameters = Map( + "fileToRead" -> fileToRead.toString(), + "clazz" -> clazz, + "valueSize" -> valueSize.toString), + cause = null) + } + + def failedToReadStreamingStateFileError(fileToRead: Path, f: Throwable): Throwable = { + new SparkException( + errorClass = "CANNOT_LOAD_STATE_STORE.CANNOT_READ_STREAMING_STATE_FILE", + messageParameters = Map("fileToRead" -> fileToRead.toString()), + cause = f) + } + def cannotPurgeAsBreakInternalStateError(): SparkUnsupportedOperationException = { new SparkUnsupportedOperationException( errorClass = "_LEGACY_ERROR_TEMP_2260", @@ -2836,6 +2874,61 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase { "enumString" -> enumString)) } + def unreleasedThreadError( + loggingId: String, + newAcquiredThreadInfo: String, + acquiredThreadInfo: String, + timeWaitedMs: Long, + stackTraceOutput: String): Throwable = { + new SparkException ( + errorClass = "CANNOT_LOAD_STATE_STORE.UNRELEASED_THREAD_ERROR", + messageParameters = Map( + "loggingId" -> loggingId, + "newAcquiredThreadInfo" -> newAcquiredThreadInfo, + "acquiredThreadInfo" -> acquiredThreadInfo, + "timeWaitedMs" -> timeWaitedMs.toString, + "stackTraceOutput" -> stackTraceOutput), + cause = null) + } + + def cannotReadCheckpoint(expectedVersion: String, actualVersion: String): Throwable = { + new SparkException ( + errorClass = "CANNOT_LOAD_STATE_STORE.CANNOT_READ_CHECKPOINT", + messageParameters = Map( + "expectedVersion" -> expectedVersion, + "actualVersion" -> actualVersion), + cause = null) + } + + def unexpectedFileSize( + dfsFile: Path, + localFile: File, + expectedSize: Long, + localFileSize: Long): Throwable = { + new SparkException( + errorClass = "CANNOT_LOAD_STATE_STORE.UNEXPECTED_FILE_SIZE", + messageParameters = Map( + "dfsFile" -> dfsFile.toString, + "localFile" -> localFile.toString, + "expectedSize" -> expectedSize.toString, + "localFileSize" -> localFileSize.toString), + cause = null) + } + + def unexpectedStateStoreVersion(version: Long): Throwable = { + new SparkException( + errorClass = "CANNOT_LOAD_STATE_STORE.UNEXPECTED_VERSION", + messageParameters = Map("version" -> version.toString), + cause = null) + } + + def cannotLoadStore(e: Throwable): Throwable = { + new SparkException( + errorClass = "CANNOT_LOAD_STATE_STORE.UNCATEGORIZED", + messageParameters = Map.empty, + cause = e) + } + def hllInvalidLgK(function: String, min: Int, max: Int, value: String): Throwable = { new SparkRuntimeException( errorClass = "HLL_INVALID_LG_K", diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala index fbf4b357a35..afa1fdaa223 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala @@ -218,12 +218,19 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with } private def getLoadedMapForStore(version: Long): HDFSBackedStateStoreMap = synchronized { - require(version >= 0, "Version cannot be less than 0") - val newMap = HDFSBackedStateStoreMap.create(keySchema, numColsPrefixKey) - if (version > 0) { - newMap.putAll(loadMap(version)) + try { + if (version < 0) { + throw QueryExecutionErrors.unexpectedStateStoreVersion(version) + } + val newMap = HDFSBackedStateStoreMap.create(keySchema, numColsPrefixKey) + if (version > 0) { + newMap.putAll(loadMap(version)) + } + newMap + } + catch { + case e: Throwable => throw QueryExecutionErrors.cannotLoadStore(e) } - newMap } override def init( @@ -457,8 +464,7 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with fm.open(fileToRead) } catch { case f: FileNotFoundException => - throw new IllegalStateException( - s"Error reading delta file $fileToRead of $this: $fileToRead does not exist", f) + throw QueryExecutionErrors.failedToReadDeltaFileNotExistsError(fileToRead, toString(), f) } try { input = decompressStream(sourceStream) @@ -469,7 +475,8 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with if (keySize == -1) { eof = true } else if (keySize < 0) { - throw QueryExecutionErrors.failedToReadDeltaFileError(fileToRead, toString(), keySize) + throw QueryExecutionErrors.failedToReadDeltaFileKeySizeError( + fileToRead, toString(), keySize) } else { val keyRowBuffer = new Array[Byte](keySize) ByteStreams.readFully(input, keyRowBuffer, 0, keySize) @@ -572,8 +579,8 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with if (keySize == -1) { eof = true } else if (keySize < 0) { - throw QueryExecutionErrors.failedToReadSnapshotFileError( - fileToRead, toString(), s"key size cannot be $keySize") + throw QueryExecutionErrors.failedToReadSnapshotFileKeySizeError( + fileToRead, toString(), keySize) } else { val keyRowBuffer = new Array[Byte](keySize) ByteStreams.readFully(input, keyRowBuffer, 0, keySize) @@ -583,8 +590,8 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with val valueSize = input.readInt() if (valueSize < 0) { - throw QueryExecutionErrors.failedToReadSnapshotFileError( - fileToRead, toString(), s"value size cannot be $valueSize") + throw QueryExecutionErrors.failedToReadSnapshotFileValueSizeError( + fileToRead, toString(), valueSize) } else { val valueRowBuffer = new Array[Byte](valueSize) ByteStreams.readFully(input, valueRowBuffer, 0, valueSize) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala index 65299ea37ef..7961c5e716b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala @@ -34,6 +34,7 @@ import org.rocksdb.TickerType._ import org.apache.spark.TaskContext import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap +import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.util.{NextIterator, Utils} /** @@ -553,11 +554,8 @@ class RocksDB( } if (isAcquiredByDifferentThread) { val stackTraceOutput = acquiredThreadInfo.threadRef.get.get.getStackTrace.mkString("\n") - val msg = s"RocksDB instance could not be acquired by $newAcquiredThreadInfo as it " + - s"was not released by $acquiredThreadInfo after $timeWaitedMs ms.\n" + - s"Thread holding the lock has trace: $stackTraceOutput" - logError(msg) - throw new IllegalStateException(s"$loggingId: $msg") + throw QueryExecutionErrors.unreleasedThreadError(loggingId, newAcquiredThreadInfo.toString, + acquiredThreadInfo.toString, timeWaitedMs, stackTraceOutput) } else { acquiredThreadInfo = newAcquiredThreadInfo // Add a listener to always release the lock when the task (if active) completes diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala index 0891d773713..ed04472b62c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala @@ -40,6 +40,7 @@ import org.json4s.jackson.Serialization import org.apache.spark.{SparkConf, SparkEnv} import org.apache.spark.internal.Logging import org.apache.spark.io.CompressionCodec +import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.streaming.CheckpointFileManager import org.apache.spark.util.Utils @@ -526,9 +527,8 @@ class RocksDBFileManager( val localFileSize = localFile.length() val expectedSize = file.sizeBytes if (localFileSize != expectedSize) { - throw new IllegalStateException( - s"Copied $dfsFile to $localFile," + - s" expected $expectedSize bytes, found $localFileSize bytes ") + throw QueryExecutionErrors.unexpectedFileSize(dfsFile, localFile, expectedSize, + localFileSize) } filesCopied += 1 bytesCopied += localFileSize @@ -717,8 +717,7 @@ object RocksDBCheckpointMetadata { try { val versionLine = reader.readLine() if (versionLine != s"v$VERSION") { - throw new IllegalStateException( - s"Cannot read RocksDB checkpoint metadata of version $versionLine") + throw QueryExecutionErrors.cannotReadCheckpoint(versionLine, s"v$VERSION") } Serialization.read[RocksDBCheckpointMetadata](reader) } finally { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala index 10f207c7ec1..53fd06fd24c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala @@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.spark.{SparkConf, SparkEnv} import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.types.StructType import org.apache.spark.util.Utils @@ -190,15 +191,29 @@ private[sql] class RocksDBStateStoreProvider override def stateStoreId: StateStoreId = stateStoreId_ override def getStore(version: Long): StateStore = { - require(version >= 0, "Version cannot be less than 0") - rocksDB.load(version) - new RocksDBStateStore(version) + try { + if (version < 0) { + throw QueryExecutionErrors.unexpectedStateStoreVersion(version) + } + rocksDB.load(version) + new RocksDBStateStore(version) + } + catch { + case e: Throwable => throw QueryExecutionErrors.cannotLoadStore(e) + } } override def getReadStore(version: Long): StateStore = { - require(version >= 0, "Version cannot be less than 0") - rocksDB.load(version, true) - new RocksDBStateStore(version) + try { + if (version < 0) { + throw QueryExecutionErrors.unexpectedStateStoreVersion(version) + } + rocksDB.load(version, true) + new RocksDBStateStore(version) + } + catch { + case e: Throwable => throw QueryExecutionErrors.cannotLoadStore(e) + } } override def doMaintenance(): Unit = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala index 96c7b61f205..359cff81aea 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala @@ -32,6 +32,7 @@ import org.apache.spark.{SparkContext, SparkEnv} import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.expressions.UnsafeRow import org.apache.spark.sql.catalyst.util.UnsafeRowUtils +import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} import org.apache.spark.sql.execution.streaming.StatefulOperatorStateInfo import org.apache.spark.sql.types.StructType @@ -486,7 +487,9 @@ object StateStore extends Logging { version: Long, storeConf: StateStoreConf, hadoopConf: Configuration): ReadStateStore = { - require(version >= 0) + if (version < 0) { + throw QueryExecutionErrors.unexpectedStateStoreVersion(version) + } val storeProvider = getStateStoreProvider(storeProviderId, keySchema, valueSchema, numColsPrefixKey, storeConf, hadoopConf) storeProvider.getReadStore(version) @@ -501,7 +504,9 @@ object StateStore extends Logging { version: Long, storeConf: StateStoreConf, hadoopConf: Configuration): StateStore = { - require(version >= 0) + if (version < 0) { + throw QueryExecutionErrors.unexpectedStateStoreVersion(version) + } val storeProvider = getStateStoreProvider(storeProviderId, keySchema, valueSchema, numColsPrefixKey, storeConf, hadoopConf) storeProvider.getStore(version) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala index 372cbb6d986..f15feb2b2ae 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala @@ -27,6 +27,7 @@ import org.apache.hadoop.fs.{FSError, Path} import org.apache.spark.internal.Logging import org.apache.spark.io.CompressionCodec +import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.streaming.CheckpointFileManager import org.apache.spark.sql.execution.streaming.CheckpointFileManager.CancellableFSDataOutputStream import org.apache.spark.util.NextIterator @@ -130,10 +131,7 @@ class StateStoreChangelogReader( fm.open(fileToRead) } catch { case f: FileNotFoundException => - throw new IllegalStateException( - s"Error reading streaming state file of $fileToRead does not exist. " + - "If the stream job is restarted with a new or updated state operation, please" + - " create a new checkpoint location or clear the existing checkpoint location.", f) + throw QueryExecutionErrors.failedToReadStreamingStateFileError(fileToRead, f) } private val input: DataInputStream = decompressStream(sourceStream) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala index e31b05c362f..b4b67f381d2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala @@ -27,6 +27,7 @@ import org.apache.hadoop.conf.Configuration import org.scalactic.source.Position import org.scalatest.Tag +import org.apache.spark.SparkException import org.apache.spark.sql.catalyst.util.quietly import org.apache.spark.sql.execution.streaming.CreateAtomicTestManager import org.apache.spark.sql.internal.SQLConf @@ -123,12 +124,37 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared } test("RocksDB: load version that doesn't exist") { + val provider = new RocksDBStateStoreProvider() + var ex = intercept[SparkException] { + provider.getStore(-1) + } + checkError( + ex, + errorClass = "CANNOT_LOAD_STATE_STORE.UNCATEGORIZED", + parameters = Map.empty + ) + ex = intercept[SparkException] { + provider.getReadStore(-1) + } + checkError( + ex, + errorClass = "CANNOT_LOAD_STATE_STORE.UNCATEGORIZED", + parameters = Map.empty + ) + val remoteDir = Utils.createTempDir().toString new File(remoteDir).delete() // to make sure that the directory gets created withDB(remoteDir) { db => - intercept[IllegalStateException] { + ex = intercept[SparkException] { db.load(1) } + checkError( + ex, + errorClass = "CANNOT_LOAD_STATE_STORE.CANNOT_READ_STREAMING_STATE_FILE", + parameters = Map( + "fileToRead" -> s"$remoteDir/1.changelog" + ) + ) } } @@ -704,12 +730,21 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared db.load(0) // Current thread should be able to load again // Another thread should not be able to load while current thread is using it - val ex = intercept[IllegalStateException] { + var ex = intercept[SparkException] { ThreadUtils.runInNewThread("concurrent-test-thread-1") { db.load(0) } } - // Assert that the error message contains the stack trace - assert(ex.getMessage.contains("Thread holding the lock has trace:")) - assert(ex.getMessage.contains("runInNewThread")) + checkError( + ex, + errorClass = "CANNOT_LOAD_STATE_STORE.UNRELEASED_THREAD_ERROR", + parameters = Map( + "loggingId" -> "\\[Thread-\\d+\\]", + "newAcquiredThreadInfo" -> "\\[ThreadId: Some\\(\\d+\\)\\]", + "acquiredThreadInfo" -> "\\[ThreadId: Some\\(\\d+\\)\\]", + "timeWaitedMs" -> "\\d+", + "stackTraceOutput" -> "(?s).*" + ), + matchPVals = true + ) // Commit should release the instance allowing other threads to load new version db.commit() @@ -720,9 +755,21 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared // Another thread should not be able to load while current thread is using it db.load(2) - intercept[IllegalStateException] { + ex = intercept[SparkException] { ThreadUtils.runInNewThread("concurrent-test-thread-2") { db.load(2) } } + checkError( + ex, + errorClass = "CANNOT_LOAD_STATE_STORE.UNRELEASED_THREAD_ERROR", + parameters = Map( + "loggingId" -> "\\[Thread-\\d+\\]", + "newAcquiredThreadInfo" -> "\\[ThreadId: Some\\(\\d+\\)\\]", + "acquiredThreadInfo" -> "\\[ThreadId: Some\\(\\d+\\)\\]", + "timeWaitedMs" -> "\\d+", + "stackTraceOutput" -> "(?s).*" + ), + matchPVals = true + ) // Rollback should release the instance allowing other threads to load new version db.rollback() @@ -752,6 +799,24 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared } test("checkpoint metadata serde roundtrip") { + // expect read metadata error when metadata uses unsupported version + withTempDir { dir => + val file2 = new File(dir, "json") + val json2 = """{"sstFiles":[],"numKeys":0}""" + FileUtils.write(file2, s"v2\n$json2") + val e = intercept[SparkException] { + RocksDBCheckpointMetadata.readFromFile(file2) + } + checkError( + e, + errorClass = "CANNOT_LOAD_STATE_STORE.CANNOT_READ_CHECKPOINT", + parameters = Map( + "expectedVersion" -> "v2", + "actualVersion" -> "v1" + ) + ) + } + def checkJsonRoundtrip(metadata: RocksDBCheckpointMetadata, json: String): Unit = { assert(metadata.json == json) withTempDir { dir => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala index 02aa12b325f..6c4e259bac5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala @@ -236,22 +236,40 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider] // Corrupt snapshot file and verify that it throws error assert(getData(provider, snapshotVersion) === Set(("a", 0) -> snapshotVersion)) corruptFile(provider, snapshotVersion, isSnapshot = true) - intercept[Exception] { + var e = intercept[SparkException] { getData(provider, snapshotVersion) } + checkError( + e, + errorClass = "CANNOT_LOAD_STATE_STORE.UNCATEGORIZED", + parameters = Map.empty + ) // Corrupt delta file and verify that it throws error assert(getData(provider, snapshotVersion - 1) === Set(("a", 0) -> (snapshotVersion - 1))) corruptFile(provider, snapshotVersion - 1, isSnapshot = false) - intercept[Exception] { + e = intercept[SparkException] { getData(provider, snapshotVersion - 1) } + checkError( + e, + errorClass = "CANNOT_LOAD_STATE_STORE.UNCATEGORIZED", + parameters = Map.empty + ) // Delete delta file and verify that it throws error deleteFilesEarlierThanVersion(provider, snapshotVersion) - intercept[Exception] { + e = intercept[SparkException] { getData(provider, snapshotVersion - 1) } + checkError( + e.getCause.asInstanceOf[SparkThrowable], + errorClass = "CANNOT_LOAD_STATE_STORE.CANNOT_READ_DELTA_FILE_NOT_EXISTS", + parameters = Map( + "fileToRead" -> s"${provider.stateStoreId.storeCheckpointLocation()}/1.delta", + "clazz" -> s"${provider.toString}" + ) + ) } } @@ -900,12 +918,17 @@ abstract class StateStoreSuiteBase[ProviderClass <: StateStoreProvider] assert(getLatestData(provider) === Set(("b", 0) -> 2)) // Trying to get newer versions should fail - intercept[Exception] { + var e = intercept[SparkException] { provider.getStore(2) } - intercept[Exception] { + assert(e.getCause.isInstanceOf[SparkException]) + assert(e.getCause.getMessage.contains("does not exist")) + + e = intercept[SparkException] { getData(provider, 2) } + assert(e.getCause.isInstanceOf[SparkException]) + assert(e.getCause.getMessage.contains("does not exist")) // New updates to the reloaded store with new version, and does not change old version tryWithProviderResource(newStoreProvider(store.id)) { reloadedProvider => @@ -1043,9 +1066,14 @@ abstract class StateStoreSuiteBase[ProviderClass <: StateStoreProvider] testWithAllCodec("getStore with invalid versions") { tryWithProviderResource(newStoreProvider()) { provider => def checkInvalidVersion(version: Int): Unit = { - intercept[Exception] { + val e = intercept[SparkException] { provider.getStore(version) } + checkError( + e, + errorClass = "CANNOT_LOAD_STATE_STORE.UNCATEGORIZED", + parameters = Map.empty + ) } checkInvalidVersion(-1) @@ -1196,16 +1224,31 @@ abstract class StateStoreSuiteBase[ProviderClass <: StateStoreProvider] val hadoopConf = new Configuration() // Verify that trying to get incorrect versions throw errors - intercept[IllegalArgumentException] { + var e = intercept[SparkException] { StateStore.get( storeId, keySchema, valueSchema, 0, -1, storeConf, hadoopConf) } - assert(!StateStore.isLoaded(storeId)) // version -1 should not attempt to load the store - - intercept[IllegalStateException] { + checkError( + e, + errorClass = "CANNOT_LOAD_STATE_STORE.UNEXPECTED_VERSION", + parameters = Map( + "version" -> "-1" + ) + ) + + e = intercept[SparkException] { StateStore.get( storeId, keySchema, valueSchema, 0, 1, storeConf, hadoopConf) } + checkError( + e.getCause.asInstanceOf[SparkThrowable], + errorClass = "CANNOT_LOAD_STATE_STORE.CANNOT_READ_DELTA_FILE_NOT_EXISTS", + parameters = Map( + "fileToRead" -> s"$dir/0/0/1.delta", + "clazz" -> "HDFSStateStoreProvider\\[.+\\]" + ), + matchPVals = true + ) // Increase version of the store and try to get again val store0 = StateStore.get( --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org