This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new d02fbba6491 [SPARK-46342][SQL] Replace `IllegalStateException` by `SparkException.internalError` in sql d02fbba6491 is described below commit d02fbba6491fd17dc6bfc1a416971af7544952f3 Author: Max Gekk <max.g...@gmail.com> AuthorDate: Sun Dec 10 11:24:02 2023 -0800 [SPARK-46342][SQL] Replace `IllegalStateException` by `SparkException.internalError` in sql ### What changes were proposed in this pull request? In the PR, I propose to replace all `IllegalStateException` exception in the `sql` project except of `streaming` by `SparkException.internalError`. ### Why are the changes needed? This is a part of migration onto new error framework and error classes. ### Does this PR introduce _any_ user-facing change? No, users shouldn't face to `IllegalStateException` in regular cases. ### How was this patch tested? Using existing GAs. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #44275 from MaxGekk/replace-ise-by-internal-error. Authored-by: Max Gekk <max.g...@gmail.com> Signed-off-by: Dongjoon Hyun <dh...@apple.com> --- .../spark/sql/catalyst/util/TimestampFormatter.scala | 5 +++-- .../scala/org/apache/spark/sql/util/ArrowUtils.scala | 3 ++- .../execution/datasources/v2/DataSourceV2Relation.scala | 2 +- .../org/apache/spark/sql/util/ArrowUtilsSuite.scala | 12 +++++++----- .../main/scala/org/apache/spark/sql/SparkSession.scala | 6 +++--- .../apache/spark/sql/api/python/PythonSQLUtils.scala | 3 ++- .../sql/catalyst/analysis/ResolveSessionCatalog.scala | 3 ++- .../spark/sql/execution/OptimizeMetadataOnlyQuery.scala | 3 ++- .../org/apache/spark/sql/execution/SQLExecution.scala | 4 ++-- .../org/apache/spark/sql/execution/SparkSqlParser.scala | 3 ++- .../apache/spark/sql/execution/SparkStrategies.scala | 17 +++++++++-------- .../spark/sql/execution/WholeStageCodegenExec.scala | 4 ++-- .../sql/execution/adaptive/AQEShuffleReadExec.scala | 8 ++++---- .../spark/sql/execution/adaptive/QueryStageExec.scala | 8 ++++---- .../execution/aggregate/AggregateCodegenSupport.scala | 3 ++- .../sql/execution/aggregate/BaseAggregateExec.scala | 7 ++++--- .../execution/aggregate/ObjectAggregationIterator.scala | 4 ++-- .../aggregate/TungstenAggregationIterator.scala | 4 ++-- .../sql/execution/aggregate/UpdatingSessionsExec.scala | 3 ++- .../execution/analysis/DetectAmbiguousSelfJoin.scala | 4 +++- .../spark/sql/execution/basicPhysicalOperators.scala | 8 ++++---- .../spark/sql/execution/columnar/InMemoryRelation.scala | 4 ++-- .../columnar/compression/compressionSchemes.scala | 5 +++-- .../spark/sql/execution/datasources/DataSource.scala | 4 ++-- .../sql/execution/datasources/DataSourceUtils.scala | 6 +++--- .../sql/execution/datasources/jdbc/DriverRegistry.scala | 3 ++- .../datasources/parquet/ParquetWriteSupport.scala | 4 ++-- .../execution/datasources/v2/DataSourceV2Strategy.scala | 6 ++++-- .../sql/execution/exchange/ShuffleExchangeExec.scala | 4 ++-- .../apache/spark/sql/execution/metric/SQLMetrics.scala | 4 ++-- .../spark/sql/execution/python/EvaluatePython.scala | 5 +++-- .../spark/sql/execution/python/ExtractPythonUDFs.scala | 7 ++++--- .../python/FlatMapGroupsInPandasWithStateExec.scala | 4 ++-- .../spark/sql/execution/window/AggregateProcessor.scala | 3 ++- .../execution/window/WindowEvaluatorFactoryBase.scala | 9 +++++---- .../apache/spark/sql/expressions/ReduceAggregator.scala | 3 ++- .../org/apache/spark/sql/SparkSessionBuilderSuite.scala | 2 +- .../org/apache/spark/sql/execution/SparkPlanSuite.scala | 13 ++++++++----- .../spark/sql/execution/WholeStageCodegenSuite.scala | 13 ++++++++----- .../sql/execution/adaptive/AdaptiveQueryExecSuite.scala | 12 +++++++----- .../spark/sql/expressions/ReduceAggregatorSuite.scala | 4 ++-- .../org/apache/spark/sql/hive/HiveInspectors.scala | 3 ++- .../apache/spark/sql/hive/client/HiveClientImpl.scala | 2 +- .../apache/spark/sql/hive/execution/HiveTempPath.scala | 5 +++-- 44 files changed, 136 insertions(+), 103 deletions(-) diff --git a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala index df146e0dbfd..9539ced52dc 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala @@ -29,6 +29,7 @@ import scala.util.control.NonFatal import org.apache.commons.lang3.time.FastDateFormat +import org.apache.spark.SparkException import org.apache.spark.sql.catalyst.util.DateTimeConstants._ import org.apache.spark.sql.catalyst.util.LegacyDateFormats.{LegacyDateFormat, LENIENT_SIMPLE_DATE_FORMAT} import org.apache.spark.sql.catalyst.util.RebaseDateTime._ @@ -90,7 +91,7 @@ sealed trait TimestampFormatter extends Serializable { @throws(classOf[DateTimeException]) @throws(classOf[IllegalStateException]) def parseWithoutTimeZone(s: String, allowTimeZone: Boolean): Long = - throw new IllegalStateException( + throw SparkException.internalError( s"The method `parseWithoutTimeZone(s: String, allowTimeZone: Boolean)` should be " + "implemented in the formatter of timestamp without time zone") @@ -137,7 +138,7 @@ sealed trait TimestampFormatter extends Serializable { @throws(classOf[IllegalStateException]) def format(localDateTime: LocalDateTime): String = - throw new IllegalStateException( + throw SparkException.internalError( s"The method `format(localDateTime: LocalDateTime)` should be implemented in the formatter " + "of timestamp without time zone") diff --git a/sql/api/src/main/scala/org/apache/spark/sql/util/ArrowUtils.scala b/sql/api/src/main/scala/org/apache/spark/sql/util/ArrowUtils.scala index 99c46c01785..f01014e1edb 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/util/ArrowUtils.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/util/ArrowUtils.scala @@ -26,6 +26,7 @@ import org.apache.arrow.vector.complex.MapVector import org.apache.arrow.vector.types.{DateUnit, FloatingPointPrecision, IntervalUnit, TimeUnit} import org.apache.arrow.vector.types.pojo.{ArrowType, Field, FieldType, Schema} +import org.apache.spark.SparkException import org.apache.spark.sql.errors.ExecutionErrors import org.apache.spark.sql.types._ import org.apache.spark.util.ArrayImplicits._ @@ -53,7 +54,7 @@ private[sql] object ArrowUtils { case DecimalType.Fixed(precision, scale) => new ArrowType.Decimal(precision, scale) case DateType => new ArrowType.Date(DateUnit.DAY) case TimestampType if timeZoneId == null => - throw new IllegalStateException("Missing timezoneId where it is mandatory.") + throw SparkException.internalError("Missing timezoneId where it is mandatory.") case TimestampType => new ArrowType.Timestamp(TimeUnit.MICROSECOND, timeZoneId) case TimestampNTZType => new ArrowType.Timestamp(TimeUnit.MICROSECOND, null) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala index 573b0274e95..8dae9904bc8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala @@ -91,7 +91,7 @@ case class DataSourceV2Relation( // when testing, throw an exception if this computeStats method is called because stats should // not be accessed before pushing the projection and filters to create a scan. otherwise, the // stats are not accurate because they are based on a full table scan of all columns. - throw new IllegalStateException( + throw SparkException.internalError( s"BUG: computeStats called before pushdown on DSv2 relation: $name") } else { // when not testing, return stats because bad stats are better than failing a query diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/util/ArrowUtilsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/ArrowUtilsSuite.scala index 28ed061a71b..c0fa43ff9bd 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/util/ArrowUtilsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/ArrowUtilsSuite.scala @@ -21,7 +21,7 @@ import java.time.ZoneId import org.apache.arrow.vector.types.pojo.ArrowType -import org.apache.spark.{SparkFunSuite, SparkUnsupportedOperationException} +import org.apache.spark.{SparkException, SparkFunSuite, SparkUnsupportedOperationException} import org.apache.spark.sql.catalyst.util.DateTimeTestUtils.LA import org.apache.spark.sql.types._ @@ -50,10 +50,12 @@ class ArrowUtilsSuite extends SparkFunSuite { roundtrip(DateType) roundtrip(YearMonthIntervalType()) roundtrip(DayTimeIntervalType()) - val tsExMsg = intercept[IllegalStateException] { - roundtrip(TimestampType) - } - assert(tsExMsg.getMessage.contains("timezoneId")) + checkError( + exception = intercept[SparkException] { + roundtrip(TimestampType) + }, + errorClass = "INTERNAL_ERROR", + parameters = Map("message" -> "Missing timezoneId where it is mandatory.")) checkError( exception = intercept[SparkUnsupportedOperationException] { ArrowUtils.fromArrowType(new ArrowType.Int(8, false)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index 9bc60f067dd..15eeca87dcf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -26,7 +26,7 @@ import scala.jdk.CollectionConverters._ import scala.reflect.runtime.universe.TypeTag import scala.util.control.NonFatal -import org.apache.spark.{SPARK_VERSION, SparkConf, SparkContext, TaskContext} +import org.apache.spark.{SPARK_VERSION, SparkConf, SparkContext, SparkException, TaskContext} import org.apache.spark.annotation.{DeveloperApi, Experimental, Stable, Unstable} import org.apache.spark.api.java.JavaRDD import org.apache.spark.internal.Logging @@ -1217,7 +1217,7 @@ object SparkSession extends Logging { */ def active: SparkSession = { getActiveSession.getOrElse(getDefaultSession.getOrElse( - throw new IllegalStateException("No active or default Spark session found"))) + throw SparkException.internalError("No active or default Spark session found"))) } /** @@ -1316,7 +1316,7 @@ object SparkSession extends Logging { private def assertOnDriver(): Unit = { if (TaskContext.get() != null) { // we're accessing it during task execution, fail. - throw new IllegalStateException( + throw SparkException.internalError( "SparkSession should only be created and accessed on the driver.") } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala index 3f0e9369c61..62e6cc07b3e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala @@ -24,6 +24,7 @@ import java.util.Locale import net.razorvine.pickle.{Pickler, Unpickler} +import org.apache.spark.SparkException import org.apache.spark.api.python.DechunkedInputStream import org.apache.spark.internal.Logging import org.apache.spark.security.SocketAuthServer @@ -159,7 +160,7 @@ private[sql] object PythonSQLUtils extends Logging { case "HOUR" => Column(zero.copy(hours = e.expr)) case "MINUTE" => Column(zero.copy(mins = e.expr)) case "SECOND" => Column(zero.copy(secs = e.expr)) - case _ => throw new IllegalStateException(s"Got the unexpected unit '$unit'.") + case _ => throw SparkException.internalError(s"Got the unexpected unit '$unit'.") } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index d44de0b260b..c00a4331803 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.analysis import org.apache.commons.lang3.StringUtils +import org.apache.spark.SparkException import org.apache.spark.sql.SaveMode import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType, CatalogUtils, ClusterBySpec} @@ -154,7 +155,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) throw QueryCompilationErrors.commandNotSupportNestedColumnError( "DESC TABLE COLUMN", toPrettySQL(child)) case _ => - throw new IllegalStateException(s"[BUG] unexpected column expression: $column") + throw SparkException.internalError(s"[BUG] unexpected column expression: $column") } // For CREATE TABLE [AS SELECT], we should use the v1 command if the catalog is resolved to the diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala index 00b1ec749d7..1de7a565f11 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution import java.util.Locale +import org.apache.spark.SparkException import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.catalog.{HiveTableRelation, SessionCatalog} import org.apache.spark.sql.catalyst.expressions._ @@ -150,7 +151,7 @@ case class OptimizeMetadataOnlyQuery(catalog: SessionCatalog) extends Rule[Logic LocalRelation(partAttrs, partitionData) case _ => - throw new IllegalStateException(s"unrecognized table scan node: $relation, " + + throw SparkException.internalError(s"unrecognized table scan node: $relation, " + s"please turn off ${SQLConf.OPTIMIZER_METADATA_ONLY.key} and try again.") } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala index b96b9c25dda..e839d2c0691 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala @@ -22,7 +22,7 @@ import java.util.concurrent.atomic.AtomicLong import scala.util.control.NonFatal -import org.apache.spark.{ErrorMessageFormat, SparkThrowable, SparkThrowableHelper} +import org.apache.spark.{ErrorMessageFormat, SparkException, SparkThrowable, SparkThrowableHelper} import org.apache.spark.SparkContext.{SPARK_JOB_DESCRIPTION, SPARK_JOB_INTERRUPT_ON_CANCEL} import org.apache.spark.internal.Logging import org.apache.spark.internal.config.{SPARK_DRIVER_PREFIX, SPARK_EXECUTOR_PREFIX} @@ -58,7 +58,7 @@ object SQLExecution extends Logging { // started execution of a query didn't call withNewExecutionId. The execution ID should be // set by calling withNewExecutionId in the action that begins execution, like // Dataset.collect or DataFrameWriter.insertInto. - throw new IllegalStateException("Execution ID should be set") + throw SparkException.internalError("Execution ID should be set") } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index d8e5d4f2270..2ef68887e87 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -26,6 +26,7 @@ import scala.jdk.CollectionConverters._ import org.antlr.v4.runtime.{ParserRuleContext, Token} import org.antlr.v4.runtime.tree.TerminalNode +import org.apache.spark.SparkException import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.{GlobalTempView, LocalTempView, PersistedView, UnresolvedFunctionName, UnresolvedIdentifier} import org.apache.spark.sql.catalyst.catalog._ @@ -269,7 +270,7 @@ class SparkSqlAstBuilder extends AstBuilder { } else if (ctx.stringLit() != null) { SetCatalogCommand(string(visitStringLit(ctx.stringLit()))) } else { - throw new IllegalStateException("Invalid catalog name") + throw SparkException.internalError("Invalid catalog name") } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index df770bd5eee..304ce0cd751 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution import java.util.Locale +import org.apache.spark.SparkException import org.apache.spark.rdd.RDD import org.apache.spark.sql.{execution, AnalysisException, Strategy} import org.apache.spark.sql.catalyst.InternalRow @@ -552,7 +553,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { if (distinctAggChildSets.length > 1) { // This is a sanity check. We should not reach here when we have multiple distinct // column sets. Our `RewriteDistinctAggregates` should take care this case. - throw new IllegalStateException( + throw SparkException.internalError( "You hit a query analyzer bug. Please report your query to Spark user mailing list.") } @@ -782,27 +783,27 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { LocalTableScanExec(output, sink.allData.map(r => toRow(r).copy())) :: Nil case logical.Distinct(child) => - throw new IllegalStateException( + throw SparkException.internalError( "logical distinct operator should have been replaced by aggregate in the optimizer") case logical.Intersect(left, right, false) => - throw new IllegalStateException( + throw SparkException.internalError( "logical intersect operator should have been replaced by semi-join in the optimizer") case logical.Intersect(left, right, true) => - throw new IllegalStateException( + throw SparkException.internalError( "logical intersect operator should have been replaced by union, aggregate" + " and generate operators in the optimizer") case logical.Except(left, right, false) => - throw new IllegalStateException( + throw SparkException.internalError( "logical except operator should have been replaced by anti-join in the optimizer") case logical.Except(left, right, true) => - throw new IllegalStateException( + throw SparkException.internalError( "logical except (all) operator should have been replaced by union, aggregate" + " and generate operators in the optimizer") case logical.ResolvedHint(child, hints) => - throw new IllegalStateException( + throw SparkException.internalError( "ResolvedHint operator should have been replaced by join hint in the optimizer") case Deduplicate(_, child) if !child.isStreaming => - throw new IllegalStateException( + throw SparkException.internalError( "Deduplicate operator for non streaming data source should have been replaced " + "by aggregate in the optimizer") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala index 0aefb0649d0..058df24fc13 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala @@ -23,7 +23,7 @@ import java.util.concurrent.atomic.{AtomicInteger, AtomicLong} import scala.collection.mutable import scala.util.control.NonFatal -import org.apache.spark.broadcast +import org.apache.spark.{broadcast, SparkException} import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ @@ -402,7 +402,7 @@ trait CodegenSupport extends SparkPlan { val errMsg = "Only leaf nodes and blocking nodes need to call 'limitNotReachedCond' " + "in its data producing loop." if (Utils.isTesting) { - throw new IllegalStateException(errMsg) + throw SparkException.internalError(errMsg) } else { logWarning(s"[BUG] $errMsg Please open a JIRA ticket to report it.") } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEShuffleReadExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEShuffleReadExec.scala index 6b39ac70a62..12e8d0e2c60 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEShuffleReadExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEShuffleReadExec.scala @@ -69,7 +69,7 @@ case class AQEShuffleReadExec private( case other => other } case _ => - throw new IllegalStateException("operating on canonicalization plan") + throw SparkException.internalError("operating on canonicalization plan") } } else if (isCoalescedRead) { // For coalesced shuffle read, the data distribution is not changed, only the number of @@ -90,7 +90,7 @@ case class AQEShuffleReadExec private( case r: RoundRobinPartitioning => r.copy(numPartitions = partitionSpecs.length) case other @ SinglePartition => - throw new IllegalStateException( + throw SparkException.internalError( "Unexpected partitioning for coalesced shuffle read: " + other) case _ => // Spark plugins may have custom partitioning and may replace this operator @@ -163,7 +163,7 @@ case class AQEShuffleReadExec private( assert(p.dataSize.isDefined) p.dataSize.get case p: PartialReducerPartitionSpec => p.dataSize - case p => throw new IllegalStateException(s"unexpected $p") + case p => throw SparkException.internalError(s"unexpected $p") }) } else { None @@ -253,7 +253,7 @@ case class AQEShuffleReadExec private( sendDriverMetrics() stage.shuffle.getShuffleRDD(partitionSpecs.toArray) case _ => - throw new IllegalStateException("operating on canonicalized plan") + throw SparkException.internalError("operating on canonicalized plan") } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala index b941feb12fc..89e9de8b084 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala @@ -21,7 +21,7 @@ import java.util.concurrent.atomic.AtomicReference import scala.concurrent.Future -import org.apache.spark.{FutureAction, MapOutputStatistics} +import org.apache.spark.{FutureAction, MapOutputStatistics, SparkException} import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow @@ -179,7 +179,7 @@ case class ShuffleQueryStageExec( case s: ShuffleExchangeLike => s case ReusedExchangeExec(_, s: ShuffleExchangeLike) => s case _ => - throw new IllegalStateException(s"wrong plan for shuffle stage:\n ${plan.treeString}") + throw SparkException.internalError(s"wrong plan for shuffle stage:\n ${plan.treeString}") } def advisoryPartitionSize: Option[Long] = shuffle.advisoryPartitionSize @@ -233,7 +233,7 @@ case class BroadcastQueryStageExec( case b: BroadcastExchangeLike => b case ReusedExchangeExec(_, b: BroadcastExchangeLike) => b case _ => - throw new IllegalStateException(s"wrong plan for broadcast stage:\n ${plan.treeString}") + throw SparkException.internalError(s"wrong plan for broadcast stage:\n ${plan.treeString}") } override protected def doMaterialize(): Future[Any] = { @@ -273,7 +273,7 @@ case class TableCacheQueryStageExec( @transient val inMemoryTableScan = plan match { case i: InMemoryTableScanExec => i case _ => - throw new IllegalStateException(s"wrong plan for table cache stage:\n ${plan.treeString}") + throw SparkException.internalError(s"wrong plan for table cache stage:\n ${plan.treeString}") } @transient diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregateCodegenSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregateCodegenSupport.scala index 1377a984223..9523bf1a1c0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregateCodegenSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregateCodegenSupport.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution.aggregate +import org.apache.spark.SparkException import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, Expression, ExpressionEquals, UnsafeRow} @@ -343,7 +344,7 @@ trait AggregateCodegenSupport "length of at least one split function went over the JVM limit: " + CodeGenerator.MAX_JVM_METHOD_PARAMS_LENGTH if (Utils.isTesting) { - throw new IllegalStateException(errMsg) + throw SparkException.internalError(errMsg) } else { logInfo(errMsg) None diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/BaseAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/BaseAggregateExec.scala index 2427a39751f..5391d580759 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/BaseAggregateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/BaseAggregateExec.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution.aggregate +import org.apache.spark.SparkException import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, AttributeSet, Expression, NamedExpression} import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Final, PartialMerge} import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, UnspecifiedDistribution} @@ -102,9 +103,9 @@ trait BaseAggregateExec extends UnaryExecNode with PartitioningPreservingUnaryEx StatefulOperatorPartitioning.getCompatibleDistribution( exprs, parts, conf) :: Nil - case _ => - throw new IllegalStateException("Expected to set the number of partitions before " + - "constructing required child distribution!") + case _ => throw SparkException.internalError( + "Expected to set the number of partitions before " + + "constructing required child distribution!") } } else { ClusteredDistribution(exprs) :: Nil diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectAggregationIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectAggregationIterator.scala index 4cc251a99db..57b8fd8570f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectAggregationIterator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectAggregationIterator.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.execution.aggregate -import org.apache.spark.{SparkEnv, TaskContext} +import org.apache.spark.{SparkEnv, SparkException, TaskContext} import org.apache.spark.internal.{config, Logging} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ @@ -109,7 +109,7 @@ class ObjectAggregationIterator( val defaultAggregationBuffer = createNewAggregationBuffer() generateOutput(UnsafeRow.createFromByteArray(0, 0), defaultAggregationBuffer) } else { - throw new IllegalStateException( + throw SparkException.internalError( "This method should not be called when groupingExpressions is not empty.") } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala index db567dcd15b..1ebf0d143bd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.execution.aggregate -import org.apache.spark.TaskContext +import org.apache.spark.{SparkException, TaskContext} import org.apache.spark.internal.Logging import org.apache.spark.memory.SparkOutOfMemoryError import org.apache.spark.sql.catalyst.InternalRow @@ -461,7 +461,7 @@ class TungstenAggregationIterator( hashMap.free() resultCopy } else { - throw new IllegalStateException( + throw SparkException.internalError( "This method should not be called when groupingExpressions is not empty.") } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/UpdatingSessionsExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/UpdatingSessionsExec.scala index fee7e29f8ad..b5dfd4639d8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/UpdatingSessionsExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/UpdatingSessionsExec.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution.aggregate +import org.apache.spark.SparkException import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, SortOrder} @@ -73,7 +74,7 @@ case class UpdatingSessionsExec( groupingWithoutSessionExpression, parts, conf) :: Nil case _ => - throw new IllegalStateException("Expected to set the number of partitions before " + + throw SparkException.internalError("Expected to set the number of partitions before " + "constructing required child distribution!") } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/analysis/DetectAmbiguousSelfJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/analysis/DetectAmbiguousSelfJoin.scala index 7e9628c3851..c2925a3ba59 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/analysis/DetectAmbiguousSelfJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/analysis/DetectAmbiguousSelfJoin.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.analysis import scala.collection.mutable +import org.apache.spark.SparkException import org.apache.spark.sql.{Column, Dataset} import org.apache.spark.sql.catalyst.expressions.{AttributeReference, AttributeSet, Cast, Equality, Expression, ExprId} import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan} @@ -95,7 +96,8 @@ object DetectAmbiguousSelfJoin extends Rule[LogicalPlan] { colRefs.foreach { ref => if (ids.contains(ref.datasetId)) { if (ref.colPos < 0 || ref.colPos >= p.output.length) { - throw new IllegalStateException("[BUG] Hit an invalid Dataset column reference: " + + throw SparkException.internalError( + "Hit an invalid Dataset column reference: " + s"$ref. Please open a JIRA ticket to report it.") } else { // When self-join happens, the analyzer asks the right side plan to generate diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala index 2fd79935507..083858e4fe8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala @@ -24,7 +24,7 @@ import scala.collection.mutable import scala.concurrent.ExecutionContext import scala.concurrent.duration.Duration -import org.apache.spark.{InterruptibleIterator, Partition, SparkContext, TaskContext} +import org.apache.spark.{InterruptibleIterator, Partition, SparkContext, SparkException, TaskContext} import org.apache.spark.rdd.{EmptyRDD, PartitionwiseSampledRDD, RDD} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ @@ -866,15 +866,15 @@ case class SubqueryExec(name: String, child: SparkPlan, maxNumRows: Option[Int] } protected override def doExecute(): RDD[InternalRow] = { - throw new IllegalStateException("SubqueryExec.doExecute should never be called") + throw SparkException.internalError("SubqueryExec.doExecute should never be called") } override def executeTake(n: Int): Array[InternalRow] = { - throw new IllegalStateException("SubqueryExec.executeTake should never be called") + throw SparkException.internalError("SubqueryExec.executeTake should never be called") } override def executeTail(n: Int): Array[InternalRow] = { - throw new IllegalStateException("SubqueryExec.executeTail should never be called") + throw SparkException.internalError("SubqueryExec.executeTail should never be called") } override def stringArgs: Iterator[Any] = Iterator(name, child) ++ Iterator(s"[id=#$id]") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala index 750f49c25b6..af958208afd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.columnar import org.apache.commons.lang3.StringUtils -import org.apache.spark.TaskContext +import org.apache.spark.{SparkException, TaskContext} import org.apache.spark.network.util.JavaUtils import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow @@ -61,7 +61,7 @@ class DefaultCachedBatchSerializer extends SimpleMetricsCachedBatchSerializer { schema: Seq[Attribute], storageLevel: StorageLevel, conf: SQLConf): RDD[CachedBatch] = - throw new IllegalStateException("Columnar input is not supported") + throw SparkException.internalError("Columnar input is not supported") override def convertInternalRowToCachedBatch( input: RDD[InternalRow], diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala index 453bc47c19f..46044f6919d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala @@ -22,6 +22,7 @@ import java.nio.ByteOrder import scala.collection.mutable +import org.apache.spark.SparkException import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.types.{PhysicalBooleanType, PhysicalByteType, PhysicalDataType, PhysicalDoubleType, PhysicalFloatType, PhysicalIntegerType, PhysicalLongType, PhysicalShortType, PhysicalStringType} import org.apache.spark.sql.errors.QueryExecutionErrors @@ -350,7 +351,7 @@ private[columnar] case object RunLengthEncoding extends CompressionScheme { decompress0(columnVector, capacity, getInt, putInt) case _: PhysicalLongType => decompress0(columnVector, capacity, getLong, putLong) - case _ => throw new IllegalStateException("Not supported type in RunLengthEncoding.") + case _ => throw SparkException.internalError("Not supported type in RunLengthEncoding.") } } } @@ -520,7 +521,7 @@ private[columnar] case object DictionaryEncoding extends CompressionScheme { } pos += 1 } - case _ => throw new IllegalStateException("Not supported type in DictionaryEncoding.") + case _ => throw SparkException.internalError("Not supported type in DictionaryEncoding.") } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 668d2538e03..71b6d4b886b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -513,7 +513,7 @@ case class DataSource( qe.assertCommandExecuted() // Replace the schema with that of the DataFrame we just wrote out to avoid re-inferring copy(userSpecifiedSchema = Some(outputColumns.toStructType.asNullable)).resolveRelation() - case _ => throw new IllegalStateException( + case _ => throw SparkException.internalError( s"${providingClass.getCanonicalName} does not allow create table as select.") } } @@ -531,7 +531,7 @@ case class DataSource( disallowWritingIntervals(data.schema.map(_.dataType), forbidAnsiIntervals = false) DataSource.validateSchema(data.schema, sparkSession.sessionState.conf) planForWritingFileFormat(format, mode, data) - case _ => throw new IllegalStateException( + case _ => throw SparkException.internalError( s"${providingClass.getCanonicalName} does not allow create table as select.") } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala index a5c1f6613b4..cf02826baf3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala @@ -25,7 +25,7 @@ import org.apache.hadoop.fs.Path import org.json4s.{Formats, NoTypeHints} import org.json4s.jackson.Serialization -import org.apache.spark.SparkUpgradeException +import org.apache.spark.{SparkException, SparkUpgradeException} import org.apache.spark.sql.{SPARK_LEGACY_DATETIME_METADATA_KEY, SPARK_LEGACY_INT96_METADATA_KEY, SPARK_TIMEZONE_METADATA_KEY, SPARK_VERSION_METADATA_KEY} import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogUtils} import org.apache.spark.sql.catalyst.expressions.{AttributeReference, AttributeSet, Expression, ExpressionSet, PredicateHelper} @@ -172,7 +172,7 @@ object DataSourceUtils extends PredicateHelper { (SQLConf.PARQUET_REBASE_MODE_IN_READ.key, ParquetOptions.DATETIME_REBASE_MODE) case "Avro" => (SQLConf.AVRO_REBASE_MODE_IN_READ.key, "datetimeRebaseMode") - case _ => throw new IllegalStateException(s"Unrecognized format $format.") + case _ => throw SparkException.internalError(s"Unrecognized format $format.") } QueryExecutionErrors.sparkUpgradeInReadingDatesError(format, config, option) } @@ -182,7 +182,7 @@ object DataSourceUtils extends PredicateHelper { case "Parquet INT96" => SQLConf.PARQUET_INT96_REBASE_MODE_IN_WRITE.key case "Parquet" => SQLConf.PARQUET_REBASE_MODE_IN_WRITE.key case "Avro" => SQLConf.AVRO_REBASE_MODE_IN_WRITE.key - case _ => throw new IllegalStateException(s"Unrecognized format $format.") + case _ => throw SparkException.internalError(s"Unrecognized format $format.") } QueryExecutionErrors.sparkUpgradeInWritingDatesError(format, config) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/DriverRegistry.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/DriverRegistry.scala index 421fa4ddace..dd429778e1f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/DriverRegistry.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/DriverRegistry.scala @@ -22,6 +22,7 @@ import java.sql.{Driver, DriverManager} import scala.collection.mutable import scala.jdk.CollectionConverters._ +import org.apache.spark.SparkException import org.apache.spark.internal.Logging import org.apache.spark.util.Utils @@ -65,7 +66,7 @@ object DriverRegistry extends Logging { case d: DriverWrapper if d.wrapped.getClass.getCanonicalName == className => d.wrapped case d if d.getClass.getCanonicalName == className => d }.getOrElse { - throw new IllegalStateException( + throw SparkException.internalError( s"Did not find registered driver with class $className") } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala index e410789504e..7194033e603 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala @@ -29,7 +29,7 @@ import org.apache.parquet.hadoop.api.WriteSupport import org.apache.parquet.hadoop.api.WriteSupport.WriteContext import org.apache.parquet.io.api.{Binary, RecordConsumer} -import org.apache.spark.SPARK_VERSION_SHORT +import org.apache.spark.{SPARK_VERSION_SHORT, SparkException} import org.apache.spark.internal.Logging import org.apache.spark.sql.{SPARK_LEGACY_DATETIME_METADATA_KEY, SPARK_LEGACY_INT96_METADATA_KEY, SPARK_TIMEZONE_METADATA_KEY, SPARK_VERSION_METADATA_KEY} import org.apache.spark.sql.catalyst.InternalRow @@ -263,7 +263,7 @@ class ParquetWriteSupport extends WriteSupport[InternalRow] with Logging { case t: UserDefinedType[_] => makeWriter(t.sqlType) - case _ => throw new IllegalStateException(s"Unsupported data type $dataType.") + case _ => throw SparkException.internalError(s"Unsupported data type $dataType.") } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 16889b247f2..fe3140c8030 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -629,7 +629,8 @@ private[sql] object DataSourceV2Strategy extends Logging { expressions.Not(rebuildExpressionFromFilter(not.child(), translatedFilterToExpr)) case _ => translatedFilterToExpr.getOrElse(predicate, - throw new IllegalStateException("Failed to rebuild Expression for filter: " + predicate)) + throw SparkException.internalError( + "Failed to rebuild Expression for filter: " + predicate)) } } @@ -642,7 +643,8 @@ private[sql] object DataSourceV2Strategy extends Logging { protected[sql] def translateRuntimeFilterV2(expr: Expression): Option[Predicate] = expr match { case in @ InSubqueryExec(PushableColumnAndNestedColumn(name), _, _, _, _, _) => val values = in.values().getOrElse { - throw new IllegalStateException(s"Can't translate $in to v2 Predicate, no subquery result") + throw SparkException.internalError( + s"Can't translate $in to v2 Predicate, no subquery result") } val literals = values.map(LiteralValue(_, in.child.dataType)) Some(new Predicate("IN", FieldReference(name) +: literals)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala index 509f1e6a1e4..69705afbb7c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala @@ -306,7 +306,7 @@ object ShuffleExchangeExec { case (partition, index) => (partition.toSeq(expressions.map(_.dataType)), index) }.toMap new KeyGroupedPartitioner(mutable.Map(valueMap.toSeq: _*), n) - case _ => throw new IllegalStateException(s"Exchange not implemented for $newPartitioning") + case _ => throw SparkException.internalError(s"Exchange not implemented for $newPartitioning") // TODO: Handle BroadcastPartitioning. } def getPartitionKeyExtractor(): InternalRow => Any = newPartitioning match { @@ -334,7 +334,7 @@ object ShuffleExchangeExec { case SinglePartition => identity case KeyGroupedPartitioning(expressions, _, _, _) => row => bindReferences(expressions, outputAttributes).map(_.eval(row)) - case _ => throw new IllegalStateException(s"Exchange not implemented for $newPartitioning") + case _ => throw SparkException.internalError(s"Exchange not implemented for $newPartitioning") } val isRoundRobin = newPartitioning.isInstanceOf[RoundRobinPartitioning] && diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala index 69d288ae75c..f0e58766dc6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala @@ -24,7 +24,7 @@ import scala.concurrent.duration._ import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache} -import org.apache.spark.SparkContext +import org.apache.spark.{SparkContext, SparkException} import org.apache.spark.scheduler.AccumulableInfo import org.apache.spark.sql.connector.metric.CustomMetric import org.apache.spark.sql.errors.QueryExecutionErrors @@ -230,7 +230,7 @@ object SQLMetrics { } else if (metricsType == NS_TIMING_METRIC) { duration => Utils.msDurationToString(duration.nanos.toMillis) } else { - throw new IllegalStateException(s"unexpected metrics type: $metricsType") + throw SparkException.internalError(s"unexpected metrics type: $metricsType") } val validValues = values.filter(_ >= 0) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala index 48db2560da9..3b18e843c6f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala @@ -24,6 +24,7 @@ import scala.jdk.CollectionConverters._ import net.razorvine.pickle.{IObjectPickler, Opcodes, Pickler} +import org.apache.spark.SparkException import org.apache.spark.api.python.SerDeUtil import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow @@ -182,9 +183,9 @@ object EvaluatePython { case c if c.getClass.isArray => val array = c.asInstanceOf[Array[_]] if (array.length != fields.length) { - throw new IllegalStateException( + throw SparkException.internalError( s"Input row doesn't have expected number of values required by the schema. " + - s"${fields.length} fields are required while ${array.length} values are provided." + s"${fields.length} fields are required while ${array.length} values are provided." ) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala index 711da3ff3f1..dcd6603f649 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.python import scala.collection.mutable import scala.collection.mutable.ArrayBuffer +import org.apache.spark.SparkException import org.apache.spark.api.python.PythonEvalType import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression @@ -262,7 +263,7 @@ object ExtractPythonUDFs extends Rule[LogicalPlan] { val evalTypes = validUdfs.map(_.evalType).toSet if (evalTypes.size != 1) { - throw new IllegalStateException( + throw SparkException.internalError( "Expected udfs have the same evalType but got different evalTypes: " + evalTypes.mkString(",")) } @@ -274,7 +275,7 @@ object ExtractPythonUDFs extends Rule[LogicalPlan] { | PythonEvalType.SQL_ARROW_BATCHED_UDF => ArrowEvalPython(validUdfs, resultAttrs, child, evalType) case _ => - throw new IllegalStateException("Unexpected UDF evalType") + throw SparkException.internalError("Unexpected UDF evalType") } attributeMap ++= validUdfs.map(canonicalizeDeterministic).zip(resultAttrs) @@ -286,7 +287,7 @@ object ExtractPythonUDFs extends Rule[LogicalPlan] { // Other cases are disallowed as they are ambiguous or would require a cartesian // product. udfs.map(canonicalizeDeterministic).filterNot(attributeMap.contains).foreach { udf => - throw new IllegalStateException( + throw SparkException.internalError( s"Invalid PythonUDF $udf, requires attributes from more than one child.") } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasWithStateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasWithStateExec.scala index 32d010b00d0..105c5ca6493 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasWithStateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasWithStateExec.scala @@ -16,7 +16,7 @@ */ package org.apache.spark.sql.execution.python -import org.apache.spark.{JobArtifactSet, TaskContext} +import org.apache.spark.{JobArtifactSet, SparkException, TaskContext} import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType} import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.InternalRow @@ -145,7 +145,7 @@ case class FlatMapGroupsInPandasWithStateExec( case ProcessingTimeTimeout => batchTimestampMs.get case EventTimeTimeout => eventTimeWatermarkForEviction.get case _ => - throw new IllegalStateException( + throw SparkException.internalError( s"Cannot filter timed out keys for $timeoutConf") } val timingOutPairs = stateManager.getAllState(store).filter { state => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/AggregateProcessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/AggregateProcessor.scala index 68ea33572c8..60ce6b06890 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/AggregateProcessor.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/AggregateProcessor.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.window import scala.collection.mutable +import org.apache.spark.SparkException import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ @@ -91,7 +92,7 @@ private[window] object AggregateProcessor { updateExpressions ++= noOps evaluateExpressions += imperative case other => - throw new IllegalStateException(s"Unsupported aggregate function: $other") + throw SparkException.internalError(s"Unsupported aggregate function: $other") } // Create the projections. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowEvaluatorFactoryBase.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowEvaluatorFactoryBase.scala index 4491861dd9d..bdaccd43c1b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowEvaluatorFactoryBase.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowEvaluatorFactoryBase.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.window import scala.collection.mutable import scala.collection.mutable.ArrayBuffer +import org.apache.spark.SparkException import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Add, AggregateWindowFunction, Ascending, Attribute, BoundReference, CurrentRow, DateAdd, DateAddYMInterval, DecimalAddNoOverflowCheck, Descending, Expression, FrameLessOffsetWindowFunction, FrameType, IdentityProjection, IntegerLiteral, MutableProjection, NamedExpression, OffsetWindowFunction, PythonFuncExpression, RangeFrame, RowFrame, RowOrdering, SortOrder, SpecifiedWindowFrame, TimeAdd, TimestampAddYMInterval, UnaryMinus, UnboundedFol [...] import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression @@ -76,7 +77,7 @@ trait WindowEvaluatorFactoryBase { RowBoundOrdering(offset) case (RowFrame, _) => - throw new IllegalStateException(s"Unhandled bound in windows expressions: $bound") + throw SparkException.internalError(s"Unhandled bound in windows expressions: $bound") case (RangeFrame, CurrentRow) => val ordering = RowOrdering.create(orderSpec, childOutput) @@ -119,7 +120,7 @@ trait WindowEvaluatorFactoryBase { RangeBoundOrdering(ordering, current, bound) case (RangeFrame, _) => - throw new IllegalStateException("Non-Zero range offsets are not supported for windows " + + throw SparkException.internalError("Non-Zero range offsets are not supported for windows " + "with multiple order expressions.") } } @@ -168,7 +169,7 @@ trait WindowEvaluatorFactoryBase { case _ => collect("AGGREGATE", frame, e, f) } case f: AggregateWindowFunction => collect("AGGREGATE", frame, e, f) - case f => throw new IllegalStateException(s"Unsupported window function: $f") + case f => throw SparkException.internalError(s"Unsupported window function: $f") } case _ => } @@ -275,7 +276,7 @@ trait WindowEvaluatorFactoryBase { } case _ => - throw new IllegalStateException(s"Unsupported factory: $key") + throw SparkException.internalError(s"Unsupported factory: $key") } // Keep track of the number of expressions. This is a side-effect in a map... diff --git a/sql/core/src/main/scala/org/apache/spark/sql/expressions/ReduceAggregator.scala b/sql/core/src/main/scala/org/apache/spark/sql/expressions/ReduceAggregator.scala index e897fdfe008..fd3df372a2d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/expressions/ReduceAggregator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/expressions/ReduceAggregator.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.expressions +import org.apache.spark.SparkException import org.apache.spark.sql.Encoder import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder @@ -72,7 +73,7 @@ private[sql] class ReduceAggregator[T: Encoder](func: (T, T) => T) override def finish(reduction: (Boolean, T)): T = { if (!reduction._1) { - throw new IllegalStateException("ReduceAggregator requires at least one input row") + throw SparkException.internalError("ReduceAggregator requires at least one input row") } reduction._2 } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala index 90082c92291..4ac05373e5a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala @@ -104,7 +104,7 @@ class SparkSessionBuilderSuite extends SparkFunSuite with Eventually { SparkSession.clearActiveSession() assert(SparkSession.active == session) SparkSession.clearDefaultSession() - intercept[IllegalStateException](SparkSession.active) + intercept[SparkException](SparkSession.active) session.stop() } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala index b14f4a405f6..058719f265d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala @@ -108,11 +108,14 @@ class SparkPlanSuite extends QueryTest with SharedSparkSession { val df = spark.range(10) val planner = spark.sessionState.planner val deduplicate = Deduplicate(df.queryExecution.analyzed.output, df.queryExecution.analyzed) - val err = intercept[IllegalStateException] { - planner.plan(deduplicate) - } - assert(err.getMessage.contains("Deduplicate operator for non streaming data source " + - "should have been replaced by aggregate in the optimizer")) + checkError( + exception = intercept[SparkException] { + planner.plan(deduplicate) + }, + errorClass = "INTERNAL_ERROR", + parameters = Map( + "message" -> ("Deduplicate operator for non streaming data source should have been " + + "replaced by aggregate in the optimizer"))) } test("SPARK-37221: The collect-like API in SparkPlan should support columnar output") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala index faefc240b79..18c6113416a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala @@ -857,16 +857,19 @@ class WholeStageCodegenSuite extends QueryTest with SharedSparkSession SQLConf.CODEGEN_METHOD_SPLIT_THRESHOLD.key -> "1", "spark.sql.CodeGenerator.validParamLength" -> "0") { withTable("t") { - val expectedErrMsg = "Failed to split aggregate code into small functions" + val expectedErrMsg = "Failed to split aggregate code into small functions.*" Seq( // Test case without keys "SELECT AVG(v) FROM VALUES(1) t(v)", // Tet case with keys "SELECT k, AVG(v) FROM VALUES((1, 1)) t(k, v) GROUP BY k").foreach { query => - val e = intercept[IllegalStateException] { - sql(query).collect() - } - assert(e.getMessage.contains(expectedErrMsg)) + checkError( + exception = intercept[SparkException] { + sql(query).collect() + }, + errorClass = "INTERNAL_ERROR", + parameters = Map("message" -> expectedErrMsg), + matchPVals = true) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index a76360439e6..a67b1b69f6b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -1014,11 +1014,13 @@ class AdaptiveQueryExecSuite val read = reads.head val c = read.canonicalized.asInstanceOf[AQEShuffleReadExec] // we can't just call execute() because that has separate checks for canonicalized plans - val ex = intercept[IllegalStateException] { - val doExecute = PrivateMethod[Unit](Symbol("doExecute")) - c.invokePrivate(doExecute()) - } - assert(ex.getMessage === "operating on canonicalized plan") + checkError( + exception = intercept[SparkException] { + val doExecute = PrivateMethod[Unit](Symbol("doExecute")) + c.invokePrivate(doExecute()) + }, + errorClass = "INTERNAL_ERROR", + parameters = Map("message" -> "operating on canonicalized plan")) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/expressions/ReduceAggregatorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/expressions/ReduceAggregatorSuite.scala index c1071373287..10122c041c4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/expressions/ReduceAggregatorSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/expressions/ReduceAggregatorSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.expressions -import org.apache.spark.SparkFunSuite +import org.apache.spark.{SparkException, SparkFunSuite} import org.apache.spark.sql.Encoders import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder @@ -72,7 +72,7 @@ class ReduceAggregatorSuite extends SparkFunSuite { val func = (v1: Int, v2: Int) => v1 + v2 val aggregator: ReduceAggregator[Int] = new ReduceAggregator(func)(Encoders.scalaInt) - intercept[IllegalStateException] { + intercept[SparkException] { aggregator.finish(aggregator.zero) } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala index 849f6f15189..ba87ad37130 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala @@ -29,6 +29,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.{StructField => HiveStructF import org.apache.hadoop.hive.serde2.objectinspector.primitive._ import org.apache.hadoop.hive.serde2.typeinfo.{DecimalTypeInfo, TypeInfoFactory} +import org.apache.spark.SparkException import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ @@ -916,7 +917,7 @@ private[hive] trait HiveInspectors { toInspector(dt.sqlType) // We will enumerate all of the possible constant expressions, throw exception if we missed case Literal(_, dt) => - throw new IllegalStateException(s"Hive doesn't support the constant type [$dt].") + throw SparkException.internalError(s"Hive doesn't support the constant type [$dt].") // ideally, we don't test the foldable here(but in optimizer), however, some of the // Hive UDF / UDAF requires its argument to be constant objectinspector, we do it eagerly. case _ if expr.foldable => toInspector(Literal.create(expr.eval(), expr.dataType)) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 3b9e2733352..72388a8d4b9 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -842,7 +842,7 @@ private[hive] class HiveClientImpl( val maxResults = 100000 val results = runHive(sql, maxResults) // It is very confusing when you only get back some of the results... - if (results.size == maxResults) throw new IllegalStateException("RESULTS POSSIBLY TRUNCATED") + if (results.size == maxResults) throw SparkException.internalError("RESULTS POSSIBLY TRUNCATED") results } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTempPath.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTempPath.scala index 6fd8892fa1f..b3b9ebea21c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTempPath.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTempPath.scala @@ -29,6 +29,7 @@ import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.hive.common.FileUtils import org.apache.hadoop.hive.ql.exec.TaskRunner +import org.apache.spark.SparkException import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession import org.apache.spark.sql.errors.QueryExecutionErrors @@ -54,7 +55,7 @@ class HiveTempPath(session: SparkSession, val hadoopConf: Configuration, path: P if (allSupportedHiveVersions.contains(hiveVersion)) { externalTempPath(path, stagingDir) } else { - throw new IllegalStateException("Unsupported hive version: " + hiveVersion.fullVersion) + throw SparkException.internalError("Unsupported hive version: " + hiveVersion.fullVersion) } } @@ -143,7 +144,7 @@ class HiveTempPath(session: SparkSession, val hadoopConf: Configuration, path: P stagingDirForCreating.foreach { stagingDir => val fs: FileSystem = stagingDir.getFileSystem(hadoopConf) if (!FileUtils.mkdir(fs, stagingDir, true, hadoopConf)) { - throw new IllegalStateException( + throw SparkException.internalError( "Cannot create staging directory '" + stagingDir.toString + "'") } fs.deleteOnExit(stagingDir) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org