beliefer commented on a change in pull request #32958:
URL: https://github.com/apache/spark/pull/32958#discussion_r656745122
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
##########
@@ -1422,4 +1426,251 @@ object QueryExecutionErrors {
def invalidStreamingOutputModeError(outputMode: Option[OutputMode]):
Throwable = {
new UnsupportedOperationException(s"Invalid output mode: $outputMode")
}
+
+ def multiFailuresInStageMaterializationError(error: Throwable): Throwable = {
+ new SparkException("Multiple failures in stage materialization.", error)
+ }
+
+ def unrecognizedCompressionSchemaTypeIDError(typeId: Int): Throwable = {
+ new UnsupportedOperationException(s"Unrecognized compression scheme type
ID: $typeId")
+ }
+
+ def getParentLoggerNotImplementedError(className: String): Throwable = {
+ new SQLFeatureNotSupportedException(s"$className.getParentLogger is not
yet implemented.")
+ }
+
+ def cannotCreateParquetConverterForTypeError(t: DecimalType, parquetType:
String): Throwable = {
+ new RuntimeException(
+ s"""
+ |Unable to create Parquet converter for ${t.typeName}
+ |whose Parquet type is $parquetType without decimal metadata. Please
read this
+ |column/field as Spark BINARY type.
+ """.stripMargin.replaceAll("\n", " "))
+ }
+
+ def cannotCreateParquetConverterForDecimalTypeError(
+ t: DecimalType, parquetType: String): Throwable = {
+ new RuntimeException(
+ s"""
+ |Unable to create Parquet converter for decimal type ${t.json} whose
Parquet type is
+ |$parquetType. Parquet DECIMAL type can only be backed by INT32,
INT64,
+ |FIXED_LEN_BYTE_ARRAY, or BINARY.
+ """.stripMargin.replaceAll("\n", " "))
+ }
+
+ def cannotCreateParquetConverterForDataTypeError(
+ t: DataType, parquetType: String): Throwable = {
+ new RuntimeException(s"Unable to create Parquet converter for data type
${t.json} " +
+ s"whose Parquet type is $parquetType")
+ }
+
+ def cannotAddMultiPartitionsOnNonatomicPartitionTableError(tableName:
String): Throwable = {
+ new UnsupportedOperationException(
+ s"Nonatomic partition table $tableName can not add multiple partitions.")
+ }
+
+ def userSpecifiedSchemaUnsupportedByDataSourceError(provider:
TableProvider): Throwable = {
+ new UnsupportedOperationException(
+ s"${provider.getClass.getSimpleName} source does not support
user-specified schema.")
+ }
+
+ def cannotDropMultiPartitionsOnNonatomicPartitionTableError(tableName:
String): Throwable = {
+ new UnsupportedOperationException(
+ s"Nonatomic partition table $tableName can not drop multiple
partitions.")
+ }
+
+ def truncateMultiPartitionUnsupportedError(tableName: String): Throwable = {
+ new UnsupportedOperationException(
+ s"The table $tableName does not support truncation of multiple
partition.")
+ }
+
+ def overwriteTableByUnsupportedExpressionError(table: Table): Throwable = {
+ new SparkException(s"Table does not support overwrite by expression:
$table")
+ }
+
+ def dynamicPartitionOverwriteUnsupportedByTableError(table: Table):
Throwable = {
+ new SparkException(s"Table does not support dynamic partition overwrite:
$table")
+ }
+
+ def failedMergingSchemaError(schema: StructType, e: SparkException):
Throwable = {
+ new SparkException(s"Failed merging schema:\n${schema.treeString}", e)
+ }
+
+ def cannotBroadcastExceedMaxTableRowsError(
+ maxBroadcastTableRows: Long, numRows: Long): Throwable = {
+ new SparkException(
+ s"Cannot broadcast the table over $maxBroadcastTableRows rows: $numRows
rows")
+ }
+
+ def cannotBroadcastExceedMaxTableBytesError(
+ maxBroadcastTableBytes: Long, dataSize: Long): Throwable = {
+ new SparkException("Cannot broadcast the table that is larger than" +
+ s" ${maxBroadcastTableBytes >> 30}GB: ${dataSize >> 30} GB")
+ }
+
+ def notEnoughMemoryToBuildAndBroadcastTableError(oe: OutOfMemoryError):
Throwable = {
+ new OutOfMemoryError("Not enough memory to build and broadcast the table
to all " +
+ "worker nodes. As a workaround, you can either disable broadcast by
setting " +
+ s"${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key} to -1 or increase the
spark " +
+ s"driver memory by setting ${SparkLauncher.DRIVER_MEMORY} to a higher
value.")
+ .initCause(oe.getCause)
+ }
+
+ def executeUnsupportedByExecError(execName: String): Throwable = {
+ new UnsupportedOperationException(s"$execName does not support the
execute() code path.")
+ }
+
+ def cannotMergeClassWithOtherClassError(className: String, otherClass:
String): Throwable = {
+ new UnsupportedOperationException(
+ s"Cannot merge $className with $otherClass")
+ }
+
+ def continuousProcessingUnsupportedByDataSourceError(sourceName: String):
Throwable = {
+ new UnsupportedOperationException(
+ s"Data source $sourceName does not support continuous processing.")
+ }
+
+ def failedReadDataError(failureReason: Throwable): Throwable = {
+ new SparkException("Data read failed", failureReason)
+ }
+
+ def failedGenerateEpochMarkerError(failureReason: Throwable): Throwable = {
+ new SparkException("Epoch marker generation failed", failureReason)
+ }
+
+ def failedWriteJobError(failureReason: Throwable): Throwable = {
+ new SparkException("Writing job aborted.", failureReason)
+ }
+
+ def foreachWriterAbortedDueToTaskFailureError(): Throwable = {
+ new SparkException("Foreach writer has been aborted due to a task failure")
+ }
+
+ def integerOverflowError(message: String): Throwable = {
+ new ArithmeticException(s"Integer overflow. $message")
+ }
+
+ def failedReadDeltaFileError(fileToRead: Path, clazz: String, keySize: Int):
Throwable = {
+ new IOException(
+ s"Error reading delta file $fileToRead of $clazz: key size cannot be
$keySize")
+ }
+
+ def failedReadSnapshotFileError(fileToRead: Path, clazz: String, message:
String): Throwable = {
+ new IOException(s"Error reading snapshot file $fileToRead of $clazz:
$message")
+ }
+
+ def cannotPurgeAsBreakInternalStateError(): Throwable = {
+ new UnsupportedOperationException("Cannot purge as it might break internal
state.")
+ }
+
+ def cleanUpSourceFilesUnsupportedError(): Throwable = {
+ new UnsupportedOperationException("Clean up source files is not supported
when" +
+ " reading from the output directory of FileStreamSink.")
+ }
+
+ def latestOffsetNotCalledError(): Throwable = {
+ new UnsupportedOperationException(
+ "latestOffset(Offset, ReadLimit) should be called instead of this
method")
+ }
+
+ def legacyCheckpointDirectoryExistsError(
+ checkpointPath: Path, legacyCheckpointDir: String): Throwable = {
+ new SparkException(
+ s"""
+ |Error: we detected a possible problem with the location of your
checkpoint and you
+ |likely need to move it before restarting this query.
+ |
+ |Earlier version of Spark incorrectly escaped paths when writing out
checkpoints for
+ |structured streaming. While this was corrected in Spark 3.0, it
appears that your
+ |query was started using an earlier version that incorrectly handled
the checkpoint
+ |path.
+ |
+ |Correct Checkpoint Directory: $checkpointPath
+ |Incorrect Checkpoint Directory: $legacyCheckpointDir
+ |
+ |Please move the data from the incorrect directory to the correct
one, delete the
+ |incorrect directory, and then restart this query. If you believe you
are receiving
+ |this message in error, you can disable it with the SQL conf
+ |${SQLConf.STREAMING_CHECKPOINT_ESCAPED_PATH_CHECK_ENABLED.key}.
+ """.stripMargin)
+ }
+
+ def subProcessExitedError(
+ exitCode: Int, stderrBuffer: CircularBuffer, cause: Throwable):
Throwable = {
+ new SparkException(s"Subprocess exited with status $exitCode. " +
+ s"Error: ${stderrBuffer.toString}", cause)
+ }
+
+ def outputDataTypeUnsupportedByNodeWithoutSerdeError(
+ nodeName: String, dt: DataType): Throwable = {
+ new SparkException(s"$nodeName without serde does not support " +
+ s"${dt.getClass.getSimpleName} as output data type")
+ }
+
+ def invalidStartIndexError(numRows: Int, startIndex: Int): Throwable = {
+ new ArrayIndexOutOfBoundsException(
+ "Invalid `startIndex` provided for generating iterator over the array. "
+
+ s"Total elements: $numRows, requested `startIndex`: $startIndex")
+ }
+
+ def concurrentModificationOnExternalAppendOnlyUnsafeRowArrayError(
+ className: String): Throwable = {
+ new ConcurrentModificationException(
+ s"The backing $className has been modified since the creation of this
Iterator")
+ }
+
+ def doExecuteBroadcastNotImplementedByNodeError(nodeName: String): Throwable
= {
+ new UnsupportedOperationException(s"$nodeName does not implement
doExecuteBroadcast")
+ }
+
+ def databaseNameConflictWithSystemPreservedDatabaseError(globalTempDB:
String): Throwable = {
+ new SparkException(
+ s"""
+ |$globalTempDB is a system preserved database, please rename your
existing database
+ |to resolve the name conflict, or set a different value for
+ |${GLOBAL_TEMP_DATABASE.key}, and launch your Spark application again.
+ """.stripMargin.split("\n").mkString(" "))
+ }
+
+ def commentOnTableUnsupportedError(): Throwable = {
+ new SQLFeatureNotSupportedException("comment on table is not supported")
+ }
+
+ def unsupportedUpdateColumnNullabilityError(): Throwable = {
+ new SQLFeatureNotSupportedException("UpdateColumnNullability is not
supported")
+ }
+
+ def renameColumnUnsupportedForOlderMySQLError(): Throwable = {
+ new SQLFeatureNotSupportedException(
+ "Rename column is only supported for MySQL version 8.0 and above.")
+ }
+
+ def hitAnErrorWhenExecutingQueryError(e: Throwable): QueryExecutionException
= {
Review comment:
Please refer
https://github.com/apache/spark/blob/960a7e5fceb2799882e5264540b2074ba9413e89/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala#L64
So there need return `Exception`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]