This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new d02fbba6491 [SPARK-46342][SQL] Replace `IllegalStateException` by 
`SparkException.internalError` in sql
d02fbba6491 is described below

commit d02fbba6491fd17dc6bfc1a416971af7544952f3
Author: Max Gekk <max.g...@gmail.com>
AuthorDate: Sun Dec 10 11:24:02 2023 -0800

    [SPARK-46342][SQL] Replace `IllegalStateException` by 
`SparkException.internalError` in sql
    
    ### What changes were proposed in this pull request?
    In the PR, I propose to replace all `IllegalStateException` exception in 
the `sql` project except of `streaming` by `SparkException.internalError`.
    
    ### Why are the changes needed?
    This is a part of migration onto new error framework and error classes.
    
    ### Does this PR introduce _any_ user-facing change?
    No, users shouldn't face to `IllegalStateException` in regular cases.
    
    ### How was this patch tested?
    Using existing GAs.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #44275 from MaxGekk/replace-ise-by-internal-error.
    
    Authored-by: Max Gekk <max.g...@gmail.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../spark/sql/catalyst/util/TimestampFormatter.scala    |  5 +++--
 .../scala/org/apache/spark/sql/util/ArrowUtils.scala    |  3 ++-
 .../execution/datasources/v2/DataSourceV2Relation.scala |  2 +-
 .../org/apache/spark/sql/util/ArrowUtilsSuite.scala     | 12 +++++++-----
 .../main/scala/org/apache/spark/sql/SparkSession.scala  |  6 +++---
 .../apache/spark/sql/api/python/PythonSQLUtils.scala    |  3 ++-
 .../sql/catalyst/analysis/ResolveSessionCatalog.scala   |  3 ++-
 .../spark/sql/execution/OptimizeMetadataOnlyQuery.scala |  3 ++-
 .../org/apache/spark/sql/execution/SQLExecution.scala   |  4 ++--
 .../org/apache/spark/sql/execution/SparkSqlParser.scala |  3 ++-
 .../apache/spark/sql/execution/SparkStrategies.scala    | 17 +++++++++--------
 .../spark/sql/execution/WholeStageCodegenExec.scala     |  4 ++--
 .../sql/execution/adaptive/AQEShuffleReadExec.scala     |  8 ++++----
 .../spark/sql/execution/adaptive/QueryStageExec.scala   |  8 ++++----
 .../execution/aggregate/AggregateCodegenSupport.scala   |  3 ++-
 .../sql/execution/aggregate/BaseAggregateExec.scala     |  7 ++++---
 .../execution/aggregate/ObjectAggregationIterator.scala |  4 ++--
 .../aggregate/TungstenAggregationIterator.scala         |  4 ++--
 .../sql/execution/aggregate/UpdatingSessionsExec.scala  |  3 ++-
 .../execution/analysis/DetectAmbiguousSelfJoin.scala    |  4 +++-
 .../spark/sql/execution/basicPhysicalOperators.scala    |  8 ++++----
 .../spark/sql/execution/columnar/InMemoryRelation.scala |  4 ++--
 .../columnar/compression/compressionSchemes.scala       |  5 +++--
 .../spark/sql/execution/datasources/DataSource.scala    |  4 ++--
 .../sql/execution/datasources/DataSourceUtils.scala     |  6 +++---
 .../sql/execution/datasources/jdbc/DriverRegistry.scala |  3 ++-
 .../datasources/parquet/ParquetWriteSupport.scala       |  4 ++--
 .../execution/datasources/v2/DataSourceV2Strategy.scala |  6 ++++--
 .../sql/execution/exchange/ShuffleExchangeExec.scala    |  4 ++--
 .../apache/spark/sql/execution/metric/SQLMetrics.scala  |  4 ++--
 .../spark/sql/execution/python/EvaluatePython.scala     |  5 +++--
 .../spark/sql/execution/python/ExtractPythonUDFs.scala  |  7 ++++---
 .../python/FlatMapGroupsInPandasWithStateExec.scala     |  4 ++--
 .../spark/sql/execution/window/AggregateProcessor.scala |  3 ++-
 .../execution/window/WindowEvaluatorFactoryBase.scala   |  9 +++++----
 .../apache/spark/sql/expressions/ReduceAggregator.scala |  3 ++-
 .../org/apache/spark/sql/SparkSessionBuilderSuite.scala |  2 +-
 .../org/apache/spark/sql/execution/SparkPlanSuite.scala | 13 ++++++++-----
 .../spark/sql/execution/WholeStageCodegenSuite.scala    | 13 ++++++++-----
 .../sql/execution/adaptive/AdaptiveQueryExecSuite.scala | 12 +++++++-----
 .../spark/sql/expressions/ReduceAggregatorSuite.scala   |  4 ++--
 .../org/apache/spark/sql/hive/HiveInspectors.scala      |  3 ++-
 .../apache/spark/sql/hive/client/HiveClientImpl.scala   |  2 +-
 .../apache/spark/sql/hive/execution/HiveTempPath.scala  |  5 +++--
 44 files changed, 136 insertions(+), 103 deletions(-)

diff --git 
a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala
 
b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala
index df146e0dbfd..9539ced52dc 100644
--- 
a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala
+++ 
b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala
@@ -29,6 +29,7 @@ import scala.util.control.NonFatal
 
 import org.apache.commons.lang3.time.FastDateFormat
 
+import org.apache.spark.SparkException
 import org.apache.spark.sql.catalyst.util.DateTimeConstants._
 import org.apache.spark.sql.catalyst.util.LegacyDateFormats.{LegacyDateFormat, 
LENIENT_SIMPLE_DATE_FORMAT}
 import org.apache.spark.sql.catalyst.util.RebaseDateTime._
@@ -90,7 +91,7 @@ sealed trait TimestampFormatter extends Serializable {
   @throws(classOf[DateTimeException])
   @throws(classOf[IllegalStateException])
   def parseWithoutTimeZone(s: String, allowTimeZone: Boolean): Long =
-    throw new IllegalStateException(
+    throw SparkException.internalError(
       s"The method `parseWithoutTimeZone(s: String, allowTimeZone: Boolean)` 
should be " +
         "implemented in the formatter of timestamp without time zone")
 
@@ -137,7 +138,7 @@ sealed trait TimestampFormatter extends Serializable {
 
   @throws(classOf[IllegalStateException])
   def format(localDateTime: LocalDateTime): String =
-    throw new IllegalStateException(
+    throw SparkException.internalError(
       s"The method `format(localDateTime: LocalDateTime)` should be 
implemented in the formatter " +
         "of timestamp without time zone")
 
diff --git a/sql/api/src/main/scala/org/apache/spark/sql/util/ArrowUtils.scala 
b/sql/api/src/main/scala/org/apache/spark/sql/util/ArrowUtils.scala
index 99c46c01785..f01014e1edb 100644
--- a/sql/api/src/main/scala/org/apache/spark/sql/util/ArrowUtils.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/util/ArrowUtils.scala
@@ -26,6 +26,7 @@ import org.apache.arrow.vector.complex.MapVector
 import org.apache.arrow.vector.types.{DateUnit, FloatingPointPrecision, 
IntervalUnit, TimeUnit}
 import org.apache.arrow.vector.types.pojo.{ArrowType, Field, FieldType, Schema}
 
+import org.apache.spark.SparkException
 import org.apache.spark.sql.errors.ExecutionErrors
 import org.apache.spark.sql.types._
 import org.apache.spark.util.ArrayImplicits._
@@ -53,7 +54,7 @@ private[sql] object ArrowUtils {
     case DecimalType.Fixed(precision, scale) => new 
ArrowType.Decimal(precision, scale)
     case DateType => new ArrowType.Date(DateUnit.DAY)
     case TimestampType if timeZoneId == null =>
-      throw new IllegalStateException("Missing timezoneId where it is 
mandatory.")
+      throw SparkException.internalError("Missing timezoneId where it is 
mandatory.")
     case TimestampType => new ArrowType.Timestamp(TimeUnit.MICROSECOND, 
timeZoneId)
     case TimestampNTZType =>
       new ArrowType.Timestamp(TimeUnit.MICROSECOND, null)
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
index 573b0274e95..8dae9904bc8 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
@@ -91,7 +91,7 @@ case class DataSourceV2Relation(
       // when testing, throw an exception if this computeStats method is 
called because stats should
       // not be accessed before pushing the projection and filters to create a 
scan. otherwise, the
       // stats are not accurate because they are based on a full table scan of 
all columns.
-      throw new IllegalStateException(
+      throw SparkException.internalError(
         s"BUG: computeStats called before pushdown on DSv2 relation: $name")
     } else {
       // when not testing, return stats because bad stats are better than 
failing a query
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/util/ArrowUtilsSuite.scala 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/ArrowUtilsSuite.scala
index 28ed061a71b..c0fa43ff9bd 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/util/ArrowUtilsSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/ArrowUtilsSuite.scala
@@ -21,7 +21,7 @@ import java.time.ZoneId
 
 import org.apache.arrow.vector.types.pojo.ArrowType
 
-import org.apache.spark.{SparkFunSuite, SparkUnsupportedOperationException}
+import org.apache.spark.{SparkException, SparkFunSuite, 
SparkUnsupportedOperationException}
 import org.apache.spark.sql.catalyst.util.DateTimeTestUtils.LA
 import org.apache.spark.sql.types._
 
@@ -50,10 +50,12 @@ class ArrowUtilsSuite extends SparkFunSuite {
     roundtrip(DateType)
     roundtrip(YearMonthIntervalType())
     roundtrip(DayTimeIntervalType())
-    val tsExMsg = intercept[IllegalStateException] {
-      roundtrip(TimestampType)
-    }
-    assert(tsExMsg.getMessage.contains("timezoneId"))
+    checkError(
+      exception = intercept[SparkException] {
+        roundtrip(TimestampType)
+      },
+      errorClass = "INTERNAL_ERROR",
+      parameters = Map("message" -> "Missing timezoneId where it is 
mandatory."))
     checkError(
       exception = intercept[SparkUnsupportedOperationException] {
         ArrowUtils.fromArrowType(new ArrowType.Int(8, false))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
index 9bc60f067dd..15eeca87dcf 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -26,7 +26,7 @@ import scala.jdk.CollectionConverters._
 import scala.reflect.runtime.universe.TypeTag
 import scala.util.control.NonFatal
 
-import org.apache.spark.{SPARK_VERSION, SparkConf, SparkContext, TaskContext}
+import org.apache.spark.{SPARK_VERSION, SparkConf, SparkContext, 
SparkException, TaskContext}
 import org.apache.spark.annotation.{DeveloperApi, Experimental, Stable, 
Unstable}
 import org.apache.spark.api.java.JavaRDD
 import org.apache.spark.internal.Logging
@@ -1217,7 +1217,7 @@ object SparkSession extends Logging {
    */
   def active: SparkSession = {
     getActiveSession.getOrElse(getDefaultSession.getOrElse(
-      throw new IllegalStateException("No active or default Spark session 
found")))
+      throw SparkException.internalError("No active or default Spark session 
found")))
   }
 
   /**
@@ -1316,7 +1316,7 @@ object SparkSession extends Logging {
   private def assertOnDriver(): Unit = {
     if (TaskContext.get() != null) {
       // we're accessing it during task execution, fail.
-      throw new IllegalStateException(
+      throw SparkException.internalError(
         "SparkSession should only be created and accessed on the driver.")
     }
   }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala
index 3f0e9369c61..62e6cc07b3e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala
@@ -24,6 +24,7 @@ import java.util.Locale
 
 import net.razorvine.pickle.{Pickler, Unpickler}
 
+import org.apache.spark.SparkException
 import org.apache.spark.api.python.DechunkedInputStream
 import org.apache.spark.internal.Logging
 import org.apache.spark.security.SocketAuthServer
@@ -159,7 +160,7 @@ private[sql] object PythonSQLUtils extends Logging {
       case "HOUR" => Column(zero.copy(hours = e.expr))
       case "MINUTE" => Column(zero.copy(mins = e.expr))
       case "SECOND" => Column(zero.copy(secs = e.expr))
-      case _ => throw new IllegalStateException(s"Got the unexpected unit 
'$unit'.")
+      case _ => throw SparkException.internalError(s"Got the unexpected unit 
'$unit'.")
     }
   }
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
index d44de0b260b..c00a4331803 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.analysis
 
 import org.apache.commons.lang3.StringUtils
 
+import org.apache.spark.SparkException
 import org.apache.spark.sql.SaveMode
 import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
 import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, 
CatalogTable, CatalogTableType, CatalogUtils, ClusterBySpec}
@@ -154,7 +155,7 @@ class ResolveSessionCatalog(val catalogManager: 
CatalogManager)
           throw QueryCompilationErrors.commandNotSupportNestedColumnError(
             "DESC TABLE COLUMN", toPrettySQL(child))
         case _ =>
-          throw new IllegalStateException(s"[BUG] unexpected column 
expression: $column")
+          throw SparkException.internalError(s"[BUG] unexpected column 
expression: $column")
       }
 
     // For CREATE TABLE [AS SELECT], we should use the v1 command if the 
catalog is resolved to the
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
index 00b1ec749d7..1de7a565f11 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution
 
 import java.util.Locale
 
+import org.apache.spark.SparkException
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.catalog.{HiveTableRelation, 
SessionCatalog}
 import org.apache.spark.sql.catalyst.expressions._
@@ -150,7 +151,7 @@ case class OptimizeMetadataOnlyQuery(catalog: 
SessionCatalog) extends Rule[Logic
             LocalRelation(partAttrs, partitionData)
 
           case _ =>
-            throw new IllegalStateException(s"unrecognized table scan node: 
$relation, " +
+            throw SparkException.internalError(s"unrecognized table scan node: 
$relation, " +
               s"please turn off ${SQLConf.OPTIMIZER_METADATA_ONLY.key} and try 
again.")
         }
     }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
index b96b9c25dda..e839d2c0691 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
@@ -22,7 +22,7 @@ import java.util.concurrent.atomic.AtomicLong
 
 import scala.util.control.NonFatal
 
-import org.apache.spark.{ErrorMessageFormat, SparkThrowable, 
SparkThrowableHelper}
+import org.apache.spark.{ErrorMessageFormat, SparkException, SparkThrowable, 
SparkThrowableHelper}
 import org.apache.spark.SparkContext.{SPARK_JOB_DESCRIPTION, 
SPARK_JOB_INTERRUPT_ON_CANCEL}
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config.{SPARK_DRIVER_PREFIX, 
SPARK_EXECUTOR_PREFIX}
@@ -58,7 +58,7 @@ object SQLExecution extends Logging {
       // started execution of a query didn't call withNewExecutionId. The 
execution ID should be
       // set by calling withNewExecutionId in the action that begins 
execution, like
       // Dataset.collect or DataFrameWriter.insertInto.
-      throw new IllegalStateException("Execution ID should be set")
+      throw SparkException.internalError("Execution ID should be set")
     }
   }
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index d8e5d4f2270..2ef68887e87 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -26,6 +26,7 @@ import scala.jdk.CollectionConverters._
 import org.antlr.v4.runtime.{ParserRuleContext, Token}
 import org.antlr.v4.runtime.tree.TerminalNode
 
+import org.apache.spark.SparkException
 import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
 import org.apache.spark.sql.catalyst.analysis.{GlobalTempView, LocalTempView, 
PersistedView, UnresolvedFunctionName, UnresolvedIdentifier}
 import org.apache.spark.sql.catalyst.catalog._
@@ -269,7 +270,7 @@ class SparkSqlAstBuilder extends AstBuilder {
     } else if (ctx.stringLit() != null) {
       SetCatalogCommand(string(visitStringLit(ctx.stringLit())))
     } else {
-      throw new IllegalStateException("Invalid catalog name")
+      throw SparkException.internalError("Invalid catalog name")
     }
   }
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index df770bd5eee..304ce0cd751 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution
 
 import java.util.Locale
 
+import org.apache.spark.SparkException
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{execution, AnalysisException, Strategy}
 import org.apache.spark.sql.catalyst.InternalRow
@@ -552,7 +553,7 @@ abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
         if (distinctAggChildSets.length > 1) {
           // This is a sanity check. We should not reach here when we have 
multiple distinct
           // column sets. Our `RewriteDistinctAggregates` should take care 
this case.
-          throw new IllegalStateException(
+          throw SparkException.internalError(
             "You hit a query analyzer bug. Please report your query to Spark 
user mailing list.")
         }
 
@@ -782,27 +783,27 @@ abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
         LocalTableScanExec(output, sink.allData.map(r => toRow(r).copy())) :: 
Nil
 
       case logical.Distinct(child) =>
-        throw new IllegalStateException(
+        throw SparkException.internalError(
           "logical distinct operator should have been replaced by aggregate in 
the optimizer")
       case logical.Intersect(left, right, false) =>
-        throw new IllegalStateException(
+        throw SparkException.internalError(
           "logical intersect  operator should have been replaced by semi-join 
in the optimizer")
       case logical.Intersect(left, right, true) =>
-        throw new IllegalStateException(
+        throw SparkException.internalError(
           "logical intersect operator should have been replaced by union, 
aggregate" +
             " and generate operators in the optimizer")
       case logical.Except(left, right, false) =>
-        throw new IllegalStateException(
+        throw SparkException.internalError(
           "logical except operator should have been replaced by anti-join in 
the optimizer")
       case logical.Except(left, right, true) =>
-        throw new IllegalStateException(
+        throw SparkException.internalError(
           "logical except (all) operator should have been replaced by union, 
aggregate" +
             " and generate operators in the optimizer")
       case logical.ResolvedHint(child, hints) =>
-        throw new IllegalStateException(
+        throw SparkException.internalError(
           "ResolvedHint operator should have been replaced by join hint in the 
optimizer")
       case Deduplicate(_, child) if !child.isStreaming =>
-        throw new IllegalStateException(
+        throw SparkException.internalError(
           "Deduplicate operator for non streaming data source should have been 
replaced " +
             "by aggregate in the optimizer")
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
index 0aefb0649d0..058df24fc13 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
@@ -23,7 +23,7 @@ import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
 import scala.collection.mutable
 import scala.util.control.NonFatal
 
-import org.apache.spark.broadcast
+import org.apache.spark.{broadcast, SparkException}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
@@ -402,7 +402,7 @@ trait CodegenSupport extends SparkPlan {
       val errMsg = "Only leaf nodes and blocking nodes need to call 
'limitNotReachedCond' " +
         "in its data producing loop."
       if (Utils.isTesting) {
-        throw new IllegalStateException(errMsg)
+        throw SparkException.internalError(errMsg)
       } else {
         logWarning(s"[BUG] $errMsg Please open a JIRA ticket to report it.")
       }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEShuffleReadExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEShuffleReadExec.scala
index 6b39ac70a62..12e8d0e2c60 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEShuffleReadExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEShuffleReadExec.scala
@@ -69,7 +69,7 @@ case class AQEShuffleReadExec private(
             case other => other
           }
         case _ =>
-          throw new IllegalStateException("operating on canonicalization plan")
+          throw SparkException.internalError("operating on canonicalization 
plan")
       }
     } else if (isCoalescedRead) {
       // For coalesced shuffle read, the data distribution is not changed, 
only the number of
@@ -90,7 +90,7 @@ case class AQEShuffleReadExec private(
         case r: RoundRobinPartitioning =>
           r.copy(numPartitions = partitionSpecs.length)
         case other @ SinglePartition =>
-          throw new IllegalStateException(
+          throw SparkException.internalError(
             "Unexpected partitioning for coalesced shuffle read: " + other)
         case _ =>
           // Spark plugins may have custom partitioning and may replace this 
operator
@@ -163,7 +163,7 @@ case class AQEShuffleReadExec private(
           assert(p.dataSize.isDefined)
           p.dataSize.get
         case p: PartialReducerPartitionSpec => p.dataSize
-        case p => throw new IllegalStateException(s"unexpected $p")
+        case p => throw SparkException.internalError(s"unexpected $p")
       })
     } else {
       None
@@ -253,7 +253,7 @@ case class AQEShuffleReadExec private(
         sendDriverMetrics()
         stage.shuffle.getShuffleRDD(partitionSpecs.toArray)
       case _ =>
-        throw new IllegalStateException("operating on canonicalized plan")
+        throw SparkException.internalError("operating on canonicalized plan")
     }
   }
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
index b941feb12fc..89e9de8b084 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
@@ -21,7 +21,7 @@ import java.util.concurrent.atomic.AtomicReference
 
 import scala.concurrent.Future
 
-import org.apache.spark.{FutureAction, MapOutputStatistics}
+import org.apache.spark.{FutureAction, MapOutputStatistics, SparkException}
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
@@ -179,7 +179,7 @@ case class ShuffleQueryStageExec(
     case s: ShuffleExchangeLike => s
     case ReusedExchangeExec(_, s: ShuffleExchangeLike) => s
     case _ =>
-      throw new IllegalStateException(s"wrong plan for shuffle stage:\n 
${plan.treeString}")
+      throw SparkException.internalError(s"wrong plan for shuffle stage:\n 
${plan.treeString}")
   }
 
   def advisoryPartitionSize: Option[Long] = shuffle.advisoryPartitionSize
@@ -233,7 +233,7 @@ case class BroadcastQueryStageExec(
     case b: BroadcastExchangeLike => b
     case ReusedExchangeExec(_, b: BroadcastExchangeLike) => b
     case _ =>
-      throw new IllegalStateException(s"wrong plan for broadcast stage:\n 
${plan.treeString}")
+      throw SparkException.internalError(s"wrong plan for broadcast stage:\n 
${plan.treeString}")
   }
 
   override protected def doMaterialize(): Future[Any] = {
@@ -273,7 +273,7 @@ case class TableCacheQueryStageExec(
   @transient val inMemoryTableScan = plan match {
     case i: InMemoryTableScanExec => i
     case _ =>
-      throw new IllegalStateException(s"wrong plan for table cache stage:\n 
${plan.treeString}")
+      throw SparkException.internalError(s"wrong plan for table cache stage:\n 
${plan.treeString}")
   }
 
   @transient
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregateCodegenSupport.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregateCodegenSupport.scala
index 1377a984223..9523bf1a1c0 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregateCodegenSupport.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregateCodegenSupport.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.execution.aggregate
 
+import org.apache.spark.SparkException
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, 
Expression, ExpressionEquals, UnsafeRow}
@@ -343,7 +344,7 @@ trait AggregateCodegenSupport
           "length of at least one split function went over the JVM limit: " +
           CodeGenerator.MAX_JVM_METHOD_PARAMS_LENGTH
         if (Utils.isTesting) {
-          throw new IllegalStateException(errMsg)
+          throw SparkException.internalError(errMsg)
         } else {
           logInfo(errMsg)
           None
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/BaseAggregateExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/BaseAggregateExec.scala
index 2427a39751f..5391d580759 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/BaseAggregateExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/BaseAggregateExec.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.execution.aggregate
 
+import org.apache.spark.SparkException
 import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference, AttributeSet, Expression, NamedExpression}
 import 
org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, 
Final, PartialMerge}
 import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, 
ClusteredDistribution, Distribution, UnspecifiedDistribution}
@@ -102,9 +103,9 @@ trait BaseAggregateExec extends UnaryExecNode with 
PartitioningPreservingUnaryEx
               StatefulOperatorPartitioning.getCompatibleDistribution(
                 exprs, parts, conf) :: Nil
 
-            case _ =>
-              throw new IllegalStateException("Expected to set the number of 
partitions before " +
-                "constructing required child distribution!")
+            case _ => throw SparkException.internalError(
+              "Expected to set the number of partitions before " +
+              "constructing required child distribution!")
           }
         } else {
           ClusteredDistribution(exprs) :: Nil
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectAggregationIterator.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectAggregationIterator.scala
index 4cc251a99db..57b8fd8570f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectAggregationIterator.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectAggregationIterator.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.execution.aggregate
 
-import org.apache.spark.{SparkEnv, TaskContext}
+import org.apache.spark.{SparkEnv, SparkException, TaskContext}
 import org.apache.spark.internal.{config, Logging}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
@@ -109,7 +109,7 @@ class ObjectAggregationIterator(
       val defaultAggregationBuffer = createNewAggregationBuffer()
       generateOutput(UnsafeRow.createFromByteArray(0, 0), 
defaultAggregationBuffer)
     } else {
-      throw new IllegalStateException(
+      throw SparkException.internalError(
         "This method should not be called when groupingExpressions is not 
empty.")
     }
   }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
index db567dcd15b..1ebf0d143bd 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.execution.aggregate
 
-import org.apache.spark.TaskContext
+import org.apache.spark.{SparkException, TaskContext}
 import org.apache.spark.internal.Logging
 import org.apache.spark.memory.SparkOutOfMemoryError
 import org.apache.spark.sql.catalyst.InternalRow
@@ -461,7 +461,7 @@ class TungstenAggregationIterator(
       hashMap.free()
       resultCopy
     } else {
-      throw new IllegalStateException(
+      throw SparkException.internalError(
         "This method should not be called when groupingExpressions is not 
empty.")
     }
   }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/UpdatingSessionsExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/UpdatingSessionsExec.scala
index fee7e29f8ad..b5dfd4639d8 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/UpdatingSessionsExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/UpdatingSessionsExec.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.execution.aggregate
 
+import org.apache.spark.SparkException
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, 
SortOrder}
@@ -73,7 +74,7 @@ case class UpdatingSessionsExec(
               groupingWithoutSessionExpression, parts, conf) :: Nil
 
           case _ =>
-            throw new IllegalStateException("Expected to set the number of 
partitions before " +
+            throw SparkException.internalError("Expected to set the number of 
partitions before " +
               "constructing required child distribution!")
         }
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/analysis/DetectAmbiguousSelfJoin.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/analysis/DetectAmbiguousSelfJoin.scala
index 7e9628c3851..c2925a3ba59 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/analysis/DetectAmbiguousSelfJoin.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/analysis/DetectAmbiguousSelfJoin.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.analysis
 
 import scala.collection.mutable
 
+import org.apache.spark.SparkException
 import org.apache.spark.sql.{Column, Dataset}
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
AttributeSet, Cast, Equality, Expression, ExprId}
 import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan}
@@ -95,7 +96,8 @@ object DetectAmbiguousSelfJoin extends Rule[LogicalPlan] {
           colRefs.foreach { ref =>
             if (ids.contains(ref.datasetId)) {
               if (ref.colPos < 0 || ref.colPos >= p.output.length) {
-                throw new IllegalStateException("[BUG] Hit an invalid Dataset 
column reference: " +
+                throw SparkException.internalError(
+                  "Hit an invalid Dataset column reference: " +
                   s"$ref. Please open a JIRA ticket to report it.")
               } else {
                 // When self-join happens, the analyzer asks the right side 
plan to generate
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
index 2fd79935507..083858e4fe8 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
@@ -24,7 +24,7 @@ import scala.collection.mutable
 import scala.concurrent.ExecutionContext
 import scala.concurrent.duration.Duration
 
-import org.apache.spark.{InterruptibleIterator, Partition, SparkContext, 
TaskContext}
+import org.apache.spark.{InterruptibleIterator, Partition, SparkContext, 
SparkException, TaskContext}
 import org.apache.spark.rdd.{EmptyRDD, PartitionwiseSampledRDD, RDD}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
@@ -866,15 +866,15 @@ case class SubqueryExec(name: String, child: SparkPlan, 
maxNumRows: Option[Int]
   }
 
   protected override def doExecute(): RDD[InternalRow] = {
-    throw new IllegalStateException("SubqueryExec.doExecute should never be 
called")
+    throw SparkException.internalError("SubqueryExec.doExecute should never be 
called")
   }
 
   override def executeTake(n: Int): Array[InternalRow] = {
-    throw new IllegalStateException("SubqueryExec.executeTake should never be 
called")
+    throw SparkException.internalError("SubqueryExec.executeTake should never 
be called")
   }
 
   override def executeTail(n: Int): Array[InternalRow] = {
-    throw new IllegalStateException("SubqueryExec.executeTail should never be 
called")
+    throw SparkException.internalError("SubqueryExec.executeTail should never 
be called")
   }
 
   override def stringArgs: Iterator[Any] = Iterator(name, child) ++ 
Iterator(s"[id=#$id]")
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
index 750f49c25b6..af958208afd 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.columnar
 
 import org.apache.commons.lang3.StringUtils
 
-import org.apache.spark.TaskContext
+import org.apache.spark.{SparkException, TaskContext}
 import org.apache.spark.network.util.JavaUtils
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
@@ -61,7 +61,7 @@ class DefaultCachedBatchSerializer extends 
SimpleMetricsCachedBatchSerializer {
       schema: Seq[Attribute],
       storageLevel: StorageLevel,
       conf: SQLConf): RDD[CachedBatch] =
-    throw new IllegalStateException("Columnar input is not supported")
+    throw SparkException.internalError("Columnar input is not supported")
 
   override def convertInternalRowToCachedBatch(
       input: RDD[InternalRow],
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala
index 453bc47c19f..46044f6919d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala
@@ -22,6 +22,7 @@ import java.nio.ByteOrder
 
 import scala.collection.mutable
 
+import org.apache.spark.SparkException
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.types.{PhysicalBooleanType, 
PhysicalByteType, PhysicalDataType, PhysicalDoubleType, PhysicalFloatType, 
PhysicalIntegerType, PhysicalLongType, PhysicalShortType, PhysicalStringType}
 import org.apache.spark.sql.errors.QueryExecutionErrors
@@ -350,7 +351,7 @@ private[columnar] case object RunLengthEncoding extends 
CompressionScheme {
           decompress0(columnVector, capacity, getInt, putInt)
         case _: PhysicalLongType =>
           decompress0(columnVector, capacity, getLong, putLong)
-        case _ => throw new IllegalStateException("Not supported type in 
RunLengthEncoding.")
+        case _ => throw SparkException.internalError("Not supported type in 
RunLengthEncoding.")
       }
     }
   }
@@ -520,7 +521,7 @@ private[columnar] case object DictionaryEncoding extends 
CompressionScheme {
             }
             pos += 1
           }
-        case _ => throw new IllegalStateException("Not supported type in 
DictionaryEncoding.")
+        case _ => throw SparkException.internalError("Not supported type in 
DictionaryEncoding.")
       }
     }
   }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 668d2538e03..71b6d4b886b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -513,7 +513,7 @@ case class DataSource(
         qe.assertCommandExecuted()
         // Replace the schema with that of the DataFrame we just wrote out to 
avoid re-inferring
         copy(userSpecifiedSchema = 
Some(outputColumns.toStructType.asNullable)).resolveRelation()
-      case _ => throw new IllegalStateException(
+      case _ => throw SparkException.internalError(
         s"${providingClass.getCanonicalName} does not allow create table as 
select.")
     }
   }
@@ -531,7 +531,7 @@ case class DataSource(
         disallowWritingIntervals(data.schema.map(_.dataType), 
forbidAnsiIntervals = false)
         DataSource.validateSchema(data.schema, sparkSession.sessionState.conf)
         planForWritingFileFormat(format, mode, data)
-      case _ => throw new IllegalStateException(
+      case _ => throw SparkException.internalError(
         s"${providingClass.getCanonicalName} does not allow create table as 
select.")
     }
   }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
index a5c1f6613b4..cf02826baf3 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
@@ -25,7 +25,7 @@ import org.apache.hadoop.fs.Path
 import org.json4s.{Formats, NoTypeHints}
 import org.json4s.jackson.Serialization
 
-import org.apache.spark.SparkUpgradeException
+import org.apache.spark.{SparkException, SparkUpgradeException}
 import org.apache.spark.sql.{SPARK_LEGACY_DATETIME_METADATA_KEY, 
SPARK_LEGACY_INT96_METADATA_KEY, SPARK_TIMEZONE_METADATA_KEY, 
SPARK_VERSION_METADATA_KEY}
 import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogUtils}
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
AttributeSet, Expression, ExpressionSet, PredicateHelper}
@@ -172,7 +172,7 @@ object DataSourceUtils extends PredicateHelper {
         (SQLConf.PARQUET_REBASE_MODE_IN_READ.key, 
ParquetOptions.DATETIME_REBASE_MODE)
       case "Avro" =>
         (SQLConf.AVRO_REBASE_MODE_IN_READ.key, "datetimeRebaseMode")
-      case _ => throw new IllegalStateException(s"Unrecognized format 
$format.")
+      case _ => throw SparkException.internalError(s"Unrecognized format 
$format.")
     }
     QueryExecutionErrors.sparkUpgradeInReadingDatesError(format, config, 
option)
   }
@@ -182,7 +182,7 @@ object DataSourceUtils extends PredicateHelper {
       case "Parquet INT96" => SQLConf.PARQUET_INT96_REBASE_MODE_IN_WRITE.key
       case "Parquet" => SQLConf.PARQUET_REBASE_MODE_IN_WRITE.key
       case "Avro" => SQLConf.AVRO_REBASE_MODE_IN_WRITE.key
-      case _ => throw new IllegalStateException(s"Unrecognized format 
$format.")
+      case _ => throw SparkException.internalError(s"Unrecognized format 
$format.")
     }
     QueryExecutionErrors.sparkUpgradeInWritingDatesError(format, config)
   }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/DriverRegistry.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/DriverRegistry.scala
index 421fa4ddace..dd429778e1f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/DriverRegistry.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/DriverRegistry.scala
@@ -22,6 +22,7 @@ import java.sql.{Driver, DriverManager}
 import scala.collection.mutable
 import scala.jdk.CollectionConverters._
 
+import org.apache.spark.SparkException
 import org.apache.spark.internal.Logging
 import org.apache.spark.util.Utils
 
@@ -65,7 +66,7 @@ object DriverRegistry extends Logging {
       case d: DriverWrapper if d.wrapped.getClass.getCanonicalName == 
className => d.wrapped
       case d if d.getClass.getCanonicalName == className => d
     }.getOrElse {
-      throw new IllegalStateException(
+      throw SparkException.internalError(
         s"Did not find registered driver with class $className")
     }
   }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
index e410789504e..7194033e603 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
@@ -29,7 +29,7 @@ import org.apache.parquet.hadoop.api.WriteSupport
 import org.apache.parquet.hadoop.api.WriteSupport.WriteContext
 import org.apache.parquet.io.api.{Binary, RecordConsumer}
 
-import org.apache.spark.SPARK_VERSION_SHORT
+import org.apache.spark.{SPARK_VERSION_SHORT, SparkException}
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{SPARK_LEGACY_DATETIME_METADATA_KEY, 
SPARK_LEGACY_INT96_METADATA_KEY, SPARK_TIMEZONE_METADATA_KEY, 
SPARK_VERSION_METADATA_KEY}
 import org.apache.spark.sql.catalyst.InternalRow
@@ -263,7 +263,7 @@ class ParquetWriteSupport extends WriteSupport[InternalRow] 
with Logging {
 
       case t: UserDefinedType[_] => makeWriter(t.sqlType)
 
-      case _ => throw new IllegalStateException(s"Unsupported data type 
$dataType.")
+      case _ => throw SparkException.internalError(s"Unsupported data type 
$dataType.")
     }
   }
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
index 16889b247f2..fe3140c8030 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
@@ -629,7 +629,8 @@ private[sql] object DataSourceV2Strategy extends Logging {
         expressions.Not(rebuildExpressionFromFilter(not.child(), 
translatedFilterToExpr))
       case _ =>
         translatedFilterToExpr.getOrElse(predicate,
-          throw new IllegalStateException("Failed to rebuild Expression for 
filter: " + predicate))
+          throw SparkException.internalError(
+            "Failed to rebuild Expression for filter: " + predicate))
     }
   }
 
@@ -642,7 +643,8 @@ private[sql] object DataSourceV2Strategy extends Logging {
   protected[sql] def translateRuntimeFilterV2(expr: Expression): 
Option[Predicate] = expr match {
     case in @ InSubqueryExec(PushableColumnAndNestedColumn(name), _, _, _, _, 
_) =>
       val values = in.values().getOrElse {
-        throw new IllegalStateException(s"Can't translate $in to v2 Predicate, 
no subquery result")
+        throw SparkException.internalError(
+          s"Can't translate $in to v2 Predicate, no subquery result")
       }
       val literals = values.map(LiteralValue(_, in.child.dataType))
       Some(new Predicate("IN", FieldReference(name) +: literals))
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
index 509f1e6a1e4..69705afbb7c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
@@ -306,7 +306,7 @@ object ShuffleExchangeExec {
           case (partition, index) => 
(partition.toSeq(expressions.map(_.dataType)), index)
         }.toMap
         new KeyGroupedPartitioner(mutable.Map(valueMap.toSeq: _*), n)
-      case _ => throw new IllegalStateException(s"Exchange not implemented for 
$newPartitioning")
+      case _ => throw SparkException.internalError(s"Exchange not implemented 
for $newPartitioning")
       // TODO: Handle BroadcastPartitioning.
     }
     def getPartitionKeyExtractor(): InternalRow => Any = newPartitioning match 
{
@@ -334,7 +334,7 @@ object ShuffleExchangeExec {
       case SinglePartition => identity
       case KeyGroupedPartitioning(expressions, _, _, _) =>
         row => bindReferences(expressions, outputAttributes).map(_.eval(row))
-      case _ => throw new IllegalStateException(s"Exchange not implemented for 
$newPartitioning")
+      case _ => throw SparkException.internalError(s"Exchange not implemented 
for $newPartitioning")
     }
 
     val isRoundRobin = newPartitioning.isInstanceOf[RoundRobinPartitioning] &&
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
index 69d288ae75c..f0e58766dc6 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
@@ -24,7 +24,7 @@ import scala.concurrent.duration._
 
 import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}
 
-import org.apache.spark.SparkContext
+import org.apache.spark.{SparkContext, SparkException}
 import org.apache.spark.scheduler.AccumulableInfo
 import org.apache.spark.sql.connector.metric.CustomMetric
 import org.apache.spark.sql.errors.QueryExecutionErrors
@@ -230,7 +230,7 @@ object SQLMetrics {
       } else if (metricsType == NS_TIMING_METRIC) {
         duration => Utils.msDurationToString(duration.nanos.toMillis)
       } else {
-        throw new IllegalStateException(s"unexpected metrics type: 
$metricsType")
+        throw SparkException.internalError(s"unexpected metrics type: 
$metricsType")
       }
 
       val validValues = values.filter(_ >= 0)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala
index 48db2560da9..3b18e843c6f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala
@@ -24,6 +24,7 @@ import scala.jdk.CollectionConverters._
 
 import net.razorvine.pickle.{IObjectPickler, Opcodes, Pickler}
 
+import org.apache.spark.SparkException
 import org.apache.spark.api.python.SerDeUtil
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
@@ -182,9 +183,9 @@ object EvaluatePython {
         case c if c.getClass.isArray =>
           val array = c.asInstanceOf[Array[_]]
           if (array.length != fields.length) {
-            throw new IllegalStateException(
+            throw SparkException.internalError(
               s"Input row doesn't have expected number of values required by 
the schema. " +
-                s"${fields.length} fields are required while ${array.length} 
values are provided."
+              s"${fields.length} fields are required while ${array.length} 
values are provided."
             )
           }
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala
index 711da3ff3f1..dcd6603f649 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.python
 import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
 
+import org.apache.spark.SparkException
 import org.apache.spark.api.python.PythonEvalType
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
@@ -262,7 +263,7 @@ object ExtractPythonUDFs extends Rule[LogicalPlan] {
 
           val evalTypes = validUdfs.map(_.evalType).toSet
           if (evalTypes.size != 1) {
-            throw new IllegalStateException(
+            throw SparkException.internalError(
               "Expected udfs have the same evalType but got different 
evalTypes: " +
               evalTypes.mkString(","))
           }
@@ -274,7 +275,7 @@ object ExtractPythonUDFs extends Rule[LogicalPlan] {
                  | PythonEvalType.SQL_ARROW_BATCHED_UDF =>
               ArrowEvalPython(validUdfs, resultAttrs, child, evalType)
             case _ =>
-              throw new IllegalStateException("Unexpected UDF evalType")
+              throw SparkException.internalError("Unexpected UDF evalType")
           }
 
           attributeMap ++= 
validUdfs.map(canonicalizeDeterministic).zip(resultAttrs)
@@ -286,7 +287,7 @@ object ExtractPythonUDFs extends Rule[LogicalPlan] {
       // Other cases are disallowed as they are ambiguous or would require a 
cartesian
       // product.
       
udfs.map(canonicalizeDeterministic).filterNot(attributeMap.contains).foreach { 
udf =>
-        throw new IllegalStateException(
+        throw SparkException.internalError(
           s"Invalid PythonUDF $udf, requires attributes from more than one 
child.")
       }
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasWithStateExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasWithStateExec.scala
index 32d010b00d0..105c5ca6493 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasWithStateExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasWithStateExec.scala
@@ -16,7 +16,7 @@
  */
 package org.apache.spark.sql.execution.python
 
-import org.apache.spark.{JobArtifactSet, TaskContext}
+import org.apache.spark.{JobArtifactSet, SparkException, TaskContext}
 import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType}
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.InternalRow
@@ -145,7 +145,7 @@ case class FlatMapGroupsInPandasWithStateExec(
           case ProcessingTimeTimeout => batchTimestampMs.get
           case EventTimeTimeout => eventTimeWatermarkForEviction.get
           case _ =>
-            throw new IllegalStateException(
+            throw SparkException.internalError(
               s"Cannot filter timed out keys for $timeoutConf")
         }
         val timingOutPairs = stateManager.getAllState(store).filter { state =>
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/AggregateProcessor.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/AggregateProcessor.scala
index 68ea33572c8..60ce6b06890 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/AggregateProcessor.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/AggregateProcessor.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.window
 
 import scala.collection.mutable
 
+import org.apache.spark.SparkException
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate._
@@ -91,7 +92,7 @@ private[window] object AggregateProcessor {
         updateExpressions ++= noOps
         evaluateExpressions += imperative
       case other =>
-        throw new IllegalStateException(s"Unsupported aggregate function: 
$other")
+        throw SparkException.internalError(s"Unsupported aggregate function: 
$other")
     }
 
     // Create the projections.
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowEvaluatorFactoryBase.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowEvaluatorFactoryBase.scala
index 4491861dd9d..bdaccd43c1b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowEvaluatorFactoryBase.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowEvaluatorFactoryBase.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.window
 import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
 
+import org.apache.spark.SparkException
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{Add, 
AggregateWindowFunction, Ascending, Attribute, BoundReference, CurrentRow, 
DateAdd, DateAddYMInterval, DecimalAddNoOverflowCheck, Descending, Expression, 
FrameLessOffsetWindowFunction, FrameType, IdentityProjection, IntegerLiteral, 
MutableProjection, NamedExpression, OffsetWindowFunction, PythonFuncExpression, 
RangeFrame, RowFrame, RowOrdering, SortOrder, SpecifiedWindowFrame, TimeAdd, 
TimestampAddYMInterval, UnaryMinus, UnboundedFol [...]
 import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
@@ -76,7 +77,7 @@ trait WindowEvaluatorFactoryBase {
         RowBoundOrdering(offset)
 
       case (RowFrame, _) =>
-        throw new IllegalStateException(s"Unhandled bound in windows 
expressions: $bound")
+        throw SparkException.internalError(s"Unhandled bound in windows 
expressions: $bound")
 
       case (RangeFrame, CurrentRow) =>
         val ordering = RowOrdering.create(orderSpec, childOutput)
@@ -119,7 +120,7 @@ trait WindowEvaluatorFactoryBase {
         RangeBoundOrdering(ordering, current, bound)
 
       case (RangeFrame, _) =>
-        throw new IllegalStateException("Non-Zero range offsets are not 
supported for windows " +
+        throw SparkException.internalError("Non-Zero range offsets are not 
supported for windows " +
           "with multiple order expressions.")
     }
   }
@@ -168,7 +169,7 @@ trait WindowEvaluatorFactoryBase {
                 case _ => collect("AGGREGATE", frame, e, f)
               }
             case f: AggregateWindowFunction => collect("AGGREGATE", frame, e, 
f)
-            case f => throw new IllegalStateException(s"Unsupported window 
function: $f")
+            case f => throw SparkException.internalError(s"Unsupported window 
function: $f")
           }
         case _ =>
       }
@@ -275,7 +276,7 @@ trait WindowEvaluatorFactoryBase {
             }
 
           case _ =>
-            throw new IllegalStateException(s"Unsupported factory: $key")
+            throw SparkException.internalError(s"Unsupported factory: $key")
         }
 
         // Keep track of the number of expressions. This is a side-effect in a 
map...
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/expressions/ReduceAggregator.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/expressions/ReduceAggregator.scala
index e897fdfe008..fd3df372a2d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/expressions/ReduceAggregator.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/expressions/ReduceAggregator.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.expressions
 
+import org.apache.spark.SparkException
 import org.apache.spark.sql.Encoder
 import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
 
@@ -72,7 +73,7 @@ private[sql] class ReduceAggregator[T: Encoder](func: (T, T) 
=> T)
 
   override def finish(reduction: (Boolean, T)): T = {
     if (!reduction._1) {
-      throw new IllegalStateException("ReduceAggregator requires at least one 
input row")
+      throw SparkException.internalError("ReduceAggregator requires at least 
one input row")
     }
     reduction._2
   }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala
index 90082c92291..4ac05373e5a 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala
@@ -104,7 +104,7 @@ class SparkSessionBuilderSuite extends SparkFunSuite with 
Eventually {
     SparkSession.clearActiveSession()
     assert(SparkSession.active == session)
     SparkSession.clearDefaultSession()
-    intercept[IllegalStateException](SparkSession.active)
+    intercept[SparkException](SparkSession.active)
     session.stop()
   }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
index b14f4a405f6..058719f265d 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
@@ -108,11 +108,14 @@ class SparkPlanSuite extends QueryTest with 
SharedSparkSession {
     val df = spark.range(10)
     val planner = spark.sessionState.planner
     val deduplicate = Deduplicate(df.queryExecution.analyzed.output, 
df.queryExecution.analyzed)
-    val err = intercept[IllegalStateException] {
-      planner.plan(deduplicate)
-    }
-    assert(err.getMessage.contains("Deduplicate operator for non streaming 
data source " +
-      "should have been replaced by aggregate in the optimizer"))
+    checkError(
+      exception = intercept[SparkException] {
+        planner.plan(deduplicate)
+      },
+      errorClass = "INTERNAL_ERROR",
+      parameters = Map(
+        "message" -> ("Deduplicate operator for non streaming data source 
should have been " +
+          "replaced by aggregate in the optimizer")))
   }
 
   test("SPARK-37221: The collect-like API in SparkPlan should support columnar 
output") {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
index faefc240b79..18c6113416a 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
@@ -857,16 +857,19 @@ class WholeStageCodegenSuite extends QueryTest with 
SharedSparkSession
         SQLConf.CODEGEN_METHOD_SPLIT_THRESHOLD.key -> "1",
         "spark.sql.CodeGenerator.validParamLength" -> "0") {
       withTable("t") {
-        val expectedErrMsg = "Failed to split aggregate code into small 
functions"
+        val expectedErrMsg = "Failed to split aggregate code into small 
functions.*"
         Seq(
           // Test case without keys
           "SELECT AVG(v) FROM VALUES(1) t(v)",
           // Tet case with keys
           "SELECT k, AVG(v) FROM VALUES((1, 1)) t(k, v) GROUP BY k").foreach { 
query =>
-          val e = intercept[IllegalStateException] {
-            sql(query).collect()
-          }
-          assert(e.getMessage.contains(expectedErrMsg))
+          checkError(
+            exception = intercept[SparkException] {
+              sql(query).collect()
+            },
+            errorClass = "INTERNAL_ERROR",
+            parameters = Map("message" -> expectedErrMsg),
+            matchPVals = true)
         }
       }
     }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
index a76360439e6..a67b1b69f6b 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
@@ -1014,11 +1014,13 @@ class AdaptiveQueryExecSuite
       val read = reads.head
       val c = read.canonicalized.asInstanceOf[AQEShuffleReadExec]
       // we can't just call execute() because that has separate checks for 
canonicalized plans
-      val ex = intercept[IllegalStateException] {
-        val doExecute = PrivateMethod[Unit](Symbol("doExecute"))
-        c.invokePrivate(doExecute())
-      }
-      assert(ex.getMessage === "operating on canonicalized plan")
+      checkError(
+        exception = intercept[SparkException] {
+          val doExecute = PrivateMethod[Unit](Symbol("doExecute"))
+          c.invokePrivate(doExecute())
+        },
+        errorClass = "INTERNAL_ERROR",
+        parameters = Map("message" -> "operating on canonicalized plan"))
     }
   }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/expressions/ReduceAggregatorSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/expressions/ReduceAggregatorSuite.scala
index c1071373287..10122c041c4 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/expressions/ReduceAggregatorSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/expressions/ReduceAggregatorSuite.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.expressions
 
-import org.apache.spark.SparkFunSuite
+import org.apache.spark.{SparkException, SparkFunSuite}
 import org.apache.spark.sql.Encoders
 import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
 
@@ -72,7 +72,7 @@ class ReduceAggregatorSuite extends SparkFunSuite {
     val func = (v1: Int, v2: Int) => v1 + v2
     val aggregator: ReduceAggregator[Int] = new 
ReduceAggregator(func)(Encoders.scalaInt)
 
-    intercept[IllegalStateException] {
+    intercept[SparkException] {
       aggregator.finish(aggregator.zero)
     }
   }
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
index 849f6f15189..ba87ad37130 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
@@ -29,6 +29,7 @@ import 
org.apache.hadoop.hive.serde2.objectinspector.{StructField => HiveStructF
 import org.apache.hadoop.hive.serde2.objectinspector.primitive._
 import org.apache.hadoop.hive.serde2.typeinfo.{DecimalTypeInfo, 
TypeInfoFactory}
 
+import org.apache.spark.SparkException
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
@@ -916,7 +917,7 @@ private[hive] trait HiveInspectors {
       toInspector(dt.sqlType)
     // We will enumerate all of the possible constant expressions, throw 
exception if we missed
     case Literal(_, dt) =>
-      throw new IllegalStateException(s"Hive doesn't support the constant type 
[$dt].")
+      throw SparkException.internalError(s"Hive doesn't support the constant 
type [$dt].")
     // ideally, we don't test the foldable here(but in optimizer), however, 
some of the
     // Hive UDF / UDAF requires its argument to be constant objectinspector, 
we do it eagerly.
     case _ if expr.foldable => toInspector(Literal.create(expr.eval(), 
expr.dataType))
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index 3b9e2733352..72388a8d4b9 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -842,7 +842,7 @@ private[hive] class HiveClientImpl(
     val maxResults = 100000
     val results = runHive(sql, maxResults)
     // It is very confusing when you only get back some of the results...
-    if (results.size == maxResults) throw new IllegalStateException("RESULTS 
POSSIBLY TRUNCATED")
+    if (results.size == maxResults) throw 
SparkException.internalError("RESULTS POSSIBLY TRUNCATED")
     results
   }
 
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTempPath.scala
 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTempPath.scala
index 6fd8892fa1f..b3b9ebea21c 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTempPath.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTempPath.scala
@@ -29,6 +29,7 @@ import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hadoop.hive.common.FileUtils
 import org.apache.hadoop.hive.ql.exec.TaskRunner
 
+import org.apache.spark.SparkException
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.errors.QueryExecutionErrors
@@ -54,7 +55,7 @@ class HiveTempPath(session: SparkSession, val hadoopConf: 
Configuration, path: P
     if (allSupportedHiveVersions.contains(hiveVersion)) {
       externalTempPath(path, stagingDir)
     } else {
-      throw new IllegalStateException("Unsupported hive version: " + 
hiveVersion.fullVersion)
+      throw SparkException.internalError("Unsupported hive version: " + 
hiveVersion.fullVersion)
     }
   }
 
@@ -143,7 +144,7 @@ class HiveTempPath(session: SparkSession, val hadoopConf: 
Configuration, path: P
       stagingDirForCreating.foreach { stagingDir =>
         val fs: FileSystem = stagingDir.getFileSystem(hadoopConf)
         if (!FileUtils.mkdir(fs, stagingDir, true, hadoopConf)) {
-          throw new IllegalStateException(
+          throw SparkException.internalError(
             "Cannot create staging directory  '" + stagingDir.toString + "'")
         }
         fs.deleteOnExit(stagingDir)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to