This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.3 by this push: new c2bd7bac76a [SPARK-39165][SQL][3.3] Replace `sys.error` by `IllegalStateException` c2bd7bac76a is described below commit c2bd7bac76a5cf7ffc5ef61a1df2b8bb5a72f131 Author: Max Gekk <max.g...@gmail.com> AuthorDate: Fri May 13 12:47:53 2022 +0300 [SPARK-39165][SQL][3.3] Replace `sys.error` by `IllegalStateException` ### What changes were proposed in this pull request? Replace all invokes of `sys.error()` by throwing of `IllegalStateException` in the `sql` namespace. This is a backport of https://github.com/apache/spark/pull/36524. ### Why are the changes needed? In the context of wrapping all internal errors like asserts/illegal state exceptions (see https://github.com/apache/spark/pull/36500), it is impossible to distinguish `RuntimeException` of `sys.error()` from Spark's exceptions like `SparkRuntimeException`. The last one can be propagated to the user space but `sys.error` exceptions shouldn't be visible to users in regular cases. ### Does this PR introduce _any_ user-facing change? No, shouldn't. sys.error shouldn't propagate exception to user space in regular cases. ### How was this patch tested? By running the existing test suites. Authored-by: Max Gekk <max.gekkgmail.com> Signed-off-by: Max Gekk <max.gekkgmail.com> (cherry picked from commit 95c7efd7571464d8adfb76fb22e47a5816cf73fb) Signed-off-by: Max Gekk <max.gekkgmail.com> Closes #36532 from MaxGekk/sys_error-internal-3.3. Authored-by: Max Gekk <max.g...@gmail.com> Signed-off-by: Max Gekk <max.g...@gmail.com> --- .../scala/org/apache/spark/sql/execution/SparkStrategies.scala | 4 ++-- .../org/apache/spark/sql/execution/datasources/DataSource.scala | 8 ++++---- .../sql/execution/datasources/parquet/ParquetWriteSupport.scala | 3 +-- .../apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala | 4 ++-- .../org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala | 5 +++-- .../scala/org/apache/spark/sql/execution/streaming/memory.scala | 3 ++- .../execution/streaming/sources/TextSocketMicroBatchStream.scala | 3 ++- .../src/main/scala/org/apache/spark/sql/execution/subquery.scala | 3 ++- .../apache/spark/sql/execution/window/AggregateProcessor.scala | 2 +- .../org/apache/spark/sql/execution/window/WindowExecBase.scala | 8 ++++---- .../src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala | 3 ++- .../scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala | 2 +- 12 files changed, 26 insertions(+), 22 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 3b8a70ffe94..17f3cfbda89 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -503,8 +503,8 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { _.aggregateFunction.children.filterNot(_.foldable).toSet).distinct.length > 1) { // This is a sanity check. We should not reach here when we have multiple distinct // column sets. Our `RewriteDistinctAggregates` should take care this case. - sys.error("You hit a query analyzer bug. Please report your query to " + - "Spark user mailing list.") + throw new IllegalStateException( + "You hit a query analyzer bug. Please report your query to Spark user mailing list.") } // Ideally this should be done in `NormalizeFloatingNumbers`, but we do it here because diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 2bb3d48c145..143fb4cf960 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -539,8 +539,8 @@ case class DataSource( DataWritingCommand.propogateMetrics(sparkSession.sparkContext, resolved, metrics) // Replace the schema with that of the DataFrame we just wrote out to avoid re-inferring copy(userSpecifiedSchema = Some(outputColumns.toStructType.asNullable)).resolveRelation() - case _ => - sys.error(s"${providingClass.getCanonicalName} does not allow create table as select.") + case _ => throw new IllegalStateException( + s"${providingClass.getCanonicalName} does not allow create table as select.") } } @@ -556,8 +556,8 @@ case class DataSource( disallowWritingIntervals(data.schema.map(_.dataType), forbidAnsiIntervals = false) DataSource.validateSchema(data.schema) planForWritingFileFormat(format, mode, data) - case _ => - sys.error(s"${providingClass.getCanonicalName} does not allow create table as select.") + case _ => throw new IllegalStateException( + s"${providingClass.getCanonicalName} does not allow create table as select.") } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala index e71863657dd..a4122fe0bdf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala @@ -254,8 +254,7 @@ class ParquetWriteSupport extends WriteSupport[InternalRow] with Logging { case t: UserDefinedType[_] => makeWriter(t.sqlType) - // TODO Adds IntervalType support - case _ => sys.error(s"Unsupported data type $dataType.") + case _ => throw new IllegalStateException(s"Unsupported data type $dataType.") } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala index c033aedc778..f3eb5636bb9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala @@ -300,7 +300,7 @@ object ShuffleExchangeExec { override def numPartitions: Int = 1 override def getPartition(key: Any): Int = 0 } - case _ => sys.error(s"Exchange not implemented for $newPartitioning") + case _ => throw new IllegalStateException(s"Exchange not implemented for $newPartitioning") // TODO: Handle BroadcastPartitioning. } def getPartitionKeyExtractor(): InternalRow => Any = newPartitioning match { @@ -319,7 +319,7 @@ object ShuffleExchangeExec { val projection = UnsafeProjection.create(sortingExpressions.map(_.child), outputAttributes) row => projection(row) case SinglePartition => identity - case _ => sys.error(s"Exchange not implemented for $newPartitioning") + case _ => throw new IllegalStateException(s"Exchange not implemented for $newPartitioning") } val isRoundRobin = newPartitioning.isInstanceOf[RoundRobinPartitioning] && diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala index a809ea07d0e..a6a5423b1f7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala @@ -284,8 +284,9 @@ object ExtractPythonUDFs extends Rule[LogicalPlan] with PredicateHelper { } // Other cases are disallowed as they are ambiguous or would require a cartesian // product. - udfs.map(canonicalizeDeterministic).filterNot(attributeMap.contains).foreach { - udf => sys.error(s"Invalid PythonUDF $udf, requires attributes from more than one child.") + udfs.map(canonicalizeDeterministic).filterNot(attributeMap.contains).foreach { udf => + throw new IllegalStateException( + s"Invalid PythonUDF $udf, requires attributes from more than one child.") } val rewritten = plan.withNewChildren(newChildren).transformExpressions { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala index dd09a38c8b3..1d377350253 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala @@ -257,7 +257,8 @@ case class MemoryStream[A : Encoder]( val offsetDiff = (newOffset.offset - lastOffsetCommitted.offset).toInt if (offsetDiff < 0) { - sys.error(s"Offsets committed out of order: $lastOffsetCommitted followed by $end") + throw new IllegalStateException( + s"Offsets committed out of order: $lastOffsetCommitted followed by $end") } batches.trimStart(offsetDiff) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketMicroBatchStream.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketMicroBatchStream.scala index 04431f3d381..580f7066e44 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketMicroBatchStream.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketMicroBatchStream.scala @@ -155,7 +155,8 @@ class TextSocketMicroBatchStream(host: String, port: Int, numPartitions: Int) val offsetDiff = (newOffset.offset - lastOffsetCommitted.offset).toInt if (offsetDiff < 0) { - sys.error(s"Offsets committed out of order: $lastOffsetCommitted followed by $end") + throw new IllegalStateException( + s"Offsets committed out of order: $lastOffsetCommitted followed by $end") } batches.trimStart(offsetDiff) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala index afd0aba0068..4bbfc3467d4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala @@ -79,7 +79,8 @@ case class ScalarSubquery( def updateResult(): Unit = { val rows = plan.executeCollect() if (rows.length > 1) { - sys.error(s"more than one row returned by a subquery used as an expression:\n$plan") + throw new IllegalStateException( + s"more than one row returned by a subquery used as an expression:\n$plan") } if (rows.length == 1) { assert(rows(0).numFields == 1, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/AggregateProcessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/AggregateProcessor.scala index 1ebbd5f4064..e40373917c5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/AggregateProcessor.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/AggregateProcessor.scala @@ -90,7 +90,7 @@ private[window] object AggregateProcessor { updateExpressions ++= noOps evaluateExpressions += imperative case other => - sys.error(s"Unsupported aggregate function: $other") + throw new IllegalStateException(s"Unsupported aggregate function: $other") } // Create the projections. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExecBase.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExecBase.scala index 5f1758d12fd..31b7df1abd0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExecBase.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExecBase.scala @@ -97,7 +97,7 @@ trait WindowExecBase extends UnaryExecNode { RowBoundOrdering(offset) case (RowFrame, _) => - sys.error(s"Unhandled bound in windows expressions: $bound") + throw new IllegalStateException(s"Unhandled bound in windows expressions: $bound") case (RangeFrame, CurrentRow) => val ordering = RowOrdering.create(orderSpec, child.output) @@ -139,7 +139,7 @@ trait WindowExecBase extends UnaryExecNode { RangeBoundOrdering(ordering, current, bound) case (RangeFrame, _) => - sys.error("Non-Zero range offsets are not supported for windows " + + throw new IllegalStateException("Non-Zero range offsets are not supported for windows " + "with multiple order expressions.") } } @@ -189,7 +189,7 @@ trait WindowExecBase extends UnaryExecNode { } case f: AggregateWindowFunction => collect("AGGREGATE", frame, e, f) case f: PythonUDF => collect("AGGREGATE", frame, e, f) - case f => sys.error(s"Unsupported window function: $f") + case f => throw new IllegalStateException(s"Unsupported window function: $f") } case _ => } @@ -296,7 +296,7 @@ trait WindowExecBase extends UnaryExecNode { } case _ => - sys.error(s"Unsupported factory: $key") + throw new IllegalStateException(s"Unsupported factory: $key") } // Keep track of the number of expressions. This is a side-effect in a map... diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala index f49018b0c85..455735a1879 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala @@ -922,7 +922,8 @@ private[hive] trait HiveInspectors { case Literal(_, dt: UserDefinedType[_]) => toInspector(dt.sqlType) // We will enumerate all of the possible constant expressions, throw exception if we missed - case Literal(_, dt) => sys.error(s"Hive doesn't support the constant type [$dt].") + case Literal(_, dt) => + throw new IllegalStateException(s"Hive doesn't support the constant type [$dt].") // ideally, we don't test the foldable here(but in optimizer), however, some of the // Hive UDF / UDAF requires its argument to be constant objectinspector, we do it eagerly. case _ if expr.foldable => toInspector(Literal.create(expr.eval(), expr.dataType)) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 3dddca84475..d70ac781c03 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -801,7 +801,7 @@ private[hive] class HiveClientImpl( val maxResults = 100000 val results = runHive(sql, maxResults) // It is very confusing when you only get back some of the results... - if (results.size == maxResults) sys.error("RESULTS POSSIBLY TRUNCATED") + if (results.size == maxResults) throw new IllegalStateException("RESULTS POSSIBLY TRUNCATED") results } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org