HeartSaVioR commented on code in PR #41705:
URL: https://github.com/apache/spark/pull/41705#discussion_r1264759322


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala:
##########
@@ -2855,6 +2893,62 @@ private[sql] object QueryExecutionErrors extends 
QueryErrorsBase {
         "enumString" -> enumString))
   }
 
+  def unreleasedThreadError(
+      loggingId: String,
+      newAcquiredThreadInfo: String,
+      AcquiredThreadInfo: String,

Review Comment:
   nit: shall we follow the style convention? `acquiredThreadInfo`.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala:
##########
@@ -2855,6 +2893,62 @@ private[sql] object QueryExecutionErrors extends 
QueryErrorsBase {
         "enumString" -> enumString))
   }
 
+  def unreleasedThreadError(
+      loggingId: String,
+      newAcquiredThreadInfo: String,
+      AcquiredThreadInfo: String,
+      timeWaitedMs: Long,
+      stackTraceOutput: String): Throwable = {
+    new SparkException (
+      errorClass = "CANNOT_LOAD_STATE_STORE.UNRELEASED_THREAD_ERROR",
+      messageParameters = Map(
+        "loggingId" -> loggingId,
+        "newAcquiredThreadInfo" -> newAcquiredThreadInfo,
+        "acquiredThreadInfo" -> AcquiredThreadInfo,
+        "timeWaitedMs" -> timeWaitedMs.toString,
+        "stackTraceOutput" -> stackTraceOutput),
+      cause = null)
+  }
+
+  def cannotReadCheckpoint(expectedVersion: String, actualVersion: String): 
Throwable = {
+    new SparkException (
+      errorClass = "CANNOT_LOAD_STATE_STORE.CANNOT_READ_CHECKPOINT",
+      messageParameters = Map(
+        "expectedVersion" -> expectedVersion,
+        "actualVersion" -> actualVersion),
+      cause = null)
+  }
+
+  def unexpectedFileSize(
+      dfsFile: Path,
+      localFile: File,
+      expectedSize: Long,
+      localFileSize: Long): Throwable = {
+    new SparkException(
+      errorClass = "CANNOT_LOAD_STATE_STORE.UNEXPECTED_FILE_SIZE",
+      messageParameters = Map(
+        "dfsFile" -> dfsFile.toString,
+        "localFile" -> localFile.toString,
+        "expectedSize" -> expectedSize.toString,
+        "localFileSize" -> localFileSize.toString
+      ),
+      cause = null)
+  }
+
+  def unexpectedStateStoreVersion(): Throwable = {

Review Comment:
   nit: probably more helpful to debug if we provide the version which is 
considered to be wrong.



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala:
##########
@@ -720,7 +755,7 @@ class RocksDBSuite extends 
AlsoTestWithChangelogCheckpointingEnabled with Shared
 
         // Another thread should not be able to load while current thread is 
using it
         db.load(2)
-        intercept[IllegalStateException] {
+        ex = intercept[SparkException] {

Review Comment:
   nit: Let's either perform validation against ex here, or let it be the same 
as it was.



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala:
##########
@@ -876,12 +889,17 @@ abstract class StateStoreSuiteBase[ProviderClass <: 
StateStoreProvider]
       assert(getLatestData(provider) === Set(("b", 0) -> 2))
 
       // Trying to get newer versions should fail
-      intercept[Exception] {
+      var e = intercept[SparkException] {
         provider.getStore(2)
       }
-      intercept[Exception] {
+      assert(e.getCause.isInstanceOf[SparkException])
+      assert(e.getCause.getMessage.contains("does not exist"))

Review Comment:
   What if parameters are different for these error classes? I guess we are 
generally no longer be able to use checkError if it can conditionally match to 
multiple error classes.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.execution.streaming.state
 
-import java.io.File
+import java.io.{File}

Review Comment:
   nit: unnecessary change



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to