karenfeng commented on a change in pull request #33774:
URL: https://github.com/apache/spark/pull/33774#discussion_r691349724
##########
File path: core/src/main/resources/error/error-classes.json
##########
@@ -1,8 +1,61 @@
{
+ "ADD_FILES_WITH_ABSOLUTE_PATH_UNSUPPORTED_ERROR" : {
Review comment:
Let's remove the `_ERROR` boilerplate suffix for simplicity.
##########
File path: core/src/main/scala/org/apache/spark/SparkException.scala
##########
@@ -79,3 +83,66 @@ class SparkArithmeticException(errorClass: String,
messageParameters: Array[Stri
override def getErrorClass: String = errorClass
override def getSqlState: String =
SparkThrowableHelper.getSqlState(errorClass)
}
+
+/**
+ * Unsupported Operation exception thrown from Spark with an error class.
+ */
+class SparkUnsupportedOperationException(errorClass: String,
messageParameters: Array[String])
+ extends UnsupportedOperationException(
+ SparkThrowableHelper.getMessage(errorClass, messageParameters))
+ with SparkThrowable {
+
+ override def getErrorClass: String = errorClass
+ override def getSqlState: String =
SparkThrowableHelper.getSqlState(errorClass)
+}
+
+/**
+ * File doesn't be found exception thrown from Spark with an error class.
Review comment:
Nit: doesn't -> not
##########
File path: core/src/main/scala/org/apache/spark/SparkException.scala
##########
@@ -79,3 +83,66 @@ class SparkArithmeticException(errorClass: String,
messageParameters: Array[Stri
override def getErrorClass: String = errorClass
override def getSqlState: String =
SparkThrowableHelper.getSqlState(errorClass)
}
+
+/**
+ * Unsupported Operation exception thrown from Spark with an error class.
+ */
+class SparkUnsupportedOperationException(errorClass: String,
messageParameters: Array[String])
+ extends UnsupportedOperationException(
+ SparkThrowableHelper.getMessage(errorClass, messageParameters))
+ with SparkThrowable {
+
+ override def getErrorClass: String = errorClass
+ override def getSqlState: String =
SparkThrowableHelper.getSqlState(errorClass)
+}
+
+/**
+ * File doesn't be found exception thrown from Spark with an error class.
+ */
+class SparkFileNotFoundException(errorClass: String, messageParameters:
Array[String])
+ extends FileNotFoundException(
+ SparkThrowableHelper.getMessage(errorClass, messageParameters))
+ with SparkThrowable {
+
+ override def getErrorClass: String = errorClass
+ override def getSqlState: String =
SparkThrowableHelper.getSqlState(errorClass)
+}
+
+/**
+ * Concurrent modification exception thrown from Spark with an error class.
+ */
+class SparkConcurrentModificationException(
+ errorClass: String,
+ messageParameters: Array[String],
+ cause: Throwable)
+ extends ConcurrentModificationException(
+ SparkThrowableHelper.getMessage(errorClass, messageParameters))
+ with SparkThrowable {
+
+ override def getErrorClass: String = errorClass
+ override def getSqlState: String =
SparkThrowableHelper.getSqlState(errorClass)
+}
+
+/**
+ * Runtime exception thrown from Spark with an error class
+ */
+class SparkRuntimeException(errorClass: String, messageParameters:
Array[String])
+ extends RuntimeException(
+ SparkThrowableHelper.getMessage(errorClass, messageParameters))
+ with SparkThrowable {
+
+ override def getErrorClass: String = errorClass
+ override def getSqlState: String =
SparkThrowableHelper.getSqlState(errorClass)
+}
+
+/**
+ * NoSuch element exception thrown form Spark with an error class.
Review comment:
Nit: NoSuch -> No such
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
##########
@@ -1406,111 +1406,144 @@ object QueryExecutionErrors {
}
def cannotGetEventTimeWatermarkError(): Throwable = {
- new UnsupportedOperationException(
- "Cannot get event time watermark timestamp without setting watermark
before " +
- "[map|flatMap]GroupsWithState")
+ new SparkUnsupportedOperationException(
+ errorClass = "CANNOT_GET_EVENT_TIME_WATERMARK_ERROR",
+ messageParameters = Array.empty
+ )
}
def cannotSetTimeoutTimestampError(): Throwable = {
- new UnsupportedOperationException(
- "Cannot set timeout timestamp without enabling event time timeout in " +
- "[map|flatMapGroupsWithState")
+ new SparkUnsupportedOperationException(
+ errorClass = "CANNOT_SET_TIMEOUT_TIMESTAMP_ERROR",
+ messageParameters = Array.empty)
}
def batchMetadataFileNotFoundError(batchMetadataFile: Path): Throwable = {
- new FileNotFoundException(s"Unable to find batch $batchMetadataFile")
+ new SparkFileNotFoundException(
+ errorClass = "BATCH_METADATA_FILE_NOT_FOUND_ERROR",
+ messageParameters = Array(batchMetadataFile.toString)
+ )
}
def multiStreamingQueriesUsingPathConcurrentlyError(
path: String, e: FileAlreadyExistsException): Throwable = {
- new ConcurrentModificationException(
- s"Multiple streaming queries are concurrently using $path", e)
+ new SparkConcurrentModificationException(
+ errorClass = "MULTI_STREAMING_QUERIES_USING_PATH_CONCURRENTLY_ERROR",
+ messageParameters = Array(path.toString), e)
}
def addFilesWithAbsolutePathUnsupportedError(commitProtocol: String):
Throwable = {
- new UnsupportedOperationException(
- s"$commitProtocol does not support adding files with an absolute path")
+ new SparkUnsupportedOperationException(
+ errorClass = "ADD_FILES_WITH_ABSOLUTE_PATH_UNSUPPORTED_ERROR",
+ messageParameters = Array(commitProtocol))
}
def microBatchUnsupportedByDataSourceError(srcName: String): Throwable = {
- new UnsupportedOperationException(
- s"Data source $srcName does not support microbatch processing.")
+ new SparkUnsupportedOperationException(
+ errorClass = "MICRO_BATCH_UNSUPPORTED_BY_DATA_SOURCE_ERROR",
+ messageParameters = Array(srcName))
}
def cannotExecuteStreamingRelationExecError(): Throwable = {
- new UnsupportedOperationException("StreamingRelationExec cannot be
executed")
+ new SparkUnsupportedOperationException(
+ errorClass = "CANNOT_EXECUTE_STREAMING_RELATION_EXEC_ERROR",
+ messageParameters = Array.empty
+ )
}
def invalidStreamingOutputModeError(outputMode: Option[OutputMode]):
Throwable = {
- new UnsupportedOperationException(s"Invalid output mode: $outputMode")
+ new SparkUnsupportedOperationException(
+ errorClass = "INVALID_STREAMING_OUTPUT_MODE_ERROR",
+ messageParameters = Array(outputMode.toString)
+ )
}
def catalogPluginClassNotFoundError(name: String): Throwable = {
- new CatalogNotFoundException(
- s"Catalog '$name' plugin class not found: spark.sql.catalog.$name is not
defined")
+ new SparkCatalogNotFoundException(
+ errorClass = "CATALOG_PLUGIN_CLASS_NOT_FOUND_ERROR",
+ messageParameters = Array(name, name))
}
def catalogPluginClassNotImplementedError(name: String, pluginClassName:
String): Throwable = {
new SparkException(
- s"Plugin class for catalog '$name' does not implement CatalogPlugin:
$pluginClassName")
+ errorClass = "CATALOG_PLUGIN_CLASS_NOT_IMPLEMENTED_ERROR",
+ messageParameters = Array(name, pluginClassName), null)
}
def catalogPluginClassNotFoundForCatalogError(
name: String,
pluginClassName: String): Throwable = {
- new SparkException(s"Cannot find catalog plugin class for catalog '$name':
$pluginClassName")
+ new SparkException(
+ errorClass = "CATALOG_PLUGIN_CLASS_NOT_FOUND_FOR_CATALOG_ERROR",
+ messageParameters = Array(name, pluginClassName), null)
}
def catalogFailToFindPublicNoArgConstructorError(
name: String,
pluginClassName: String,
e: Exception): Throwable = {
new SparkException(
- s"Failed to find public no-arg constructor for catalog '$name':
$pluginClassName)", e)
+ errorClass = "CATALOG_FAIL_TO_FIND_PUBLIC_NO_ARG_CONSTRUCTOR_ERROR",
+ messageParameters = Array(name, pluginClassName), e)
}
def catalogFailToCallPublicNoArgConstructorError(
name: String,
pluginClassName: String,
e: Exception): Throwable = {
new SparkException(
- s"Failed to call public no-arg constructor for catalog '$name':
$pluginClassName)", e)
+ errorClass = "CATALOG_FAIL_TO_CALL_PUBLIC_NO_ARG_CONSTRUCTOR_ERROR",
+ messageParameters = Array(name, pluginClassName), e)
}
def cannotInstantiateAbstractCatalogPluginClassError(
name: String,
pluginClassName: String,
e: Exception): Throwable = {
- new SparkException("Cannot instantiate abstract catalog plugin class for "
+
- s"catalog '$name': $pluginClassName", e.getCause)
+ new SparkException(
+ errorClass = "CANNOT_INSTANTIATE_ABSTRACT_CATALOG_PLUGIN_CLASS_ERROR",
+ messageParameters = Array(name, pluginClassName), e.getCause)
}
def failedToInstantiateConstructorForCatalogError(
name: String,
pluginClassName: String,
e: Exception): Throwable = {
- new SparkException("Failed during instantiating constructor for catalog " +
- s"'$name': $pluginClassName", e.getCause)
+ new SparkException(
+ errorClass = "FAILED_TO_INSTANTIATE_CONSTRUCTOR_FOR_CATALOG_ERROR",
+ messageParameters = Array(name, pluginClassName), e.getCause)
}
def noSuchElementExceptionError(): Throwable = {
- new NoSuchElementException
+ new SparkNoSuchElementException(
+ errorClass = "NO_SUCH_ELEMENT_EXCEPTION_ERROR",
+ messageParameters = Array.empty
+ )
}
def noSuchElementExceptionError(key: String): Throwable = {
new NoSuchElementException(key)
Review comment:
Can you also fix this one? Should be straightforward; we can use the
same errorClass as `noSuchElementExceptionError()` but change it to have
`"message": [ "%s" ]`, and pass the key here in `messageParameters`.
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
##########
@@ -1406,111 +1406,144 @@ object QueryExecutionErrors {
}
def cannotGetEventTimeWatermarkError(): Throwable = {
- new UnsupportedOperationException(
- "Cannot get event time watermark timestamp without setting watermark
before " +
- "[map|flatMap]GroupsWithState")
+ new SparkUnsupportedOperationException(
+ errorClass = "CANNOT_GET_EVENT_TIME_WATERMARK_ERROR",
+ messageParameters = Array.empty
+ )
}
def cannotSetTimeoutTimestampError(): Throwable = {
- new UnsupportedOperationException(
- "Cannot set timeout timestamp without enabling event time timeout in " +
- "[map|flatMapGroupsWithState")
+ new SparkUnsupportedOperationException(
+ errorClass = "CANNOT_SET_TIMEOUT_TIMESTAMP_ERROR",
+ messageParameters = Array.empty)
}
def batchMetadataFileNotFoundError(batchMetadataFile: Path): Throwable = {
- new FileNotFoundException(s"Unable to find batch $batchMetadataFile")
+ new SparkFileNotFoundException(
+ errorClass = "BATCH_METADATA_FILE_NOT_FOUND_ERROR",
+ messageParameters = Array(batchMetadataFile.toString)
+ )
}
def multiStreamingQueriesUsingPathConcurrentlyError(
path: String, e: FileAlreadyExistsException): Throwable = {
- new ConcurrentModificationException(
- s"Multiple streaming queries are concurrently using $path", e)
+ new SparkConcurrentModificationException(
+ errorClass = "MULTI_STREAMING_QUERIES_USING_PATH_CONCURRENTLY_ERROR",
+ messageParameters = Array(path.toString), e)
}
def addFilesWithAbsolutePathUnsupportedError(commitProtocol: String):
Throwable = {
- new UnsupportedOperationException(
- s"$commitProtocol does not support adding files with an absolute path")
+ new SparkUnsupportedOperationException(
+ errorClass = "ADD_FILES_WITH_ABSOLUTE_PATH_UNSUPPORTED_ERROR",
+ messageParameters = Array(commitProtocol))
}
def microBatchUnsupportedByDataSourceError(srcName: String): Throwable = {
- new UnsupportedOperationException(
- s"Data source $srcName does not support microbatch processing.")
+ new SparkUnsupportedOperationException(
+ errorClass = "MICRO_BATCH_UNSUPPORTED_BY_DATA_SOURCE_ERROR",
+ messageParameters = Array(srcName))
}
def cannotExecuteStreamingRelationExecError(): Throwable = {
- new UnsupportedOperationException("StreamingRelationExec cannot be
executed")
+ new SparkUnsupportedOperationException(
+ errorClass = "CANNOT_EXECUTE_STREAMING_RELATION_EXEC_ERROR",
+ messageParameters = Array.empty
+ )
}
def invalidStreamingOutputModeError(outputMode: Option[OutputMode]):
Throwable = {
- new UnsupportedOperationException(s"Invalid output mode: $outputMode")
+ new SparkUnsupportedOperationException(
+ errorClass = "INVALID_STREAMING_OUTPUT_MODE_ERROR",
+ messageParameters = Array(outputMode.toString)
+ )
}
def catalogPluginClassNotFoundError(name: String): Throwable = {
- new CatalogNotFoundException(
- s"Catalog '$name' plugin class not found: spark.sql.catalog.$name is not
defined")
+ new SparkCatalogNotFoundException(
+ errorClass = "CATALOG_PLUGIN_CLASS_NOT_FOUND_ERROR",
+ messageParameters = Array(name, name))
}
def catalogPluginClassNotImplementedError(name: String, pluginClassName:
String): Throwable = {
new SparkException(
- s"Plugin class for catalog '$name' does not implement CatalogPlugin:
$pluginClassName")
+ errorClass = "CATALOG_PLUGIN_CLASS_NOT_IMPLEMENTED_ERROR",
+ messageParameters = Array(name, pluginClassName), null)
}
def catalogPluginClassNotFoundForCatalogError(
name: String,
pluginClassName: String): Throwable = {
- new SparkException(s"Cannot find catalog plugin class for catalog '$name':
$pluginClassName")
+ new SparkException(
+ errorClass = "CATALOG_PLUGIN_CLASS_NOT_FOUND_FOR_CATALOG_ERROR",
+ messageParameters = Array(name, pluginClassName), null)
}
def catalogFailToFindPublicNoArgConstructorError(
name: String,
pluginClassName: String,
e: Exception): Throwable = {
new SparkException(
- s"Failed to find public no-arg constructor for catalog '$name':
$pluginClassName)", e)
+ errorClass = "CATALOG_FAIL_TO_FIND_PUBLIC_NO_ARG_CONSTRUCTOR_ERROR",
+ messageParameters = Array(name, pluginClassName), e)
}
def catalogFailToCallPublicNoArgConstructorError(
name: String,
pluginClassName: String,
e: Exception): Throwable = {
new SparkException(
- s"Failed to call public no-arg constructor for catalog '$name':
$pluginClassName)", e)
+ errorClass = "CATALOG_FAIL_TO_CALL_PUBLIC_NO_ARG_CONSTRUCTOR_ERROR",
+ messageParameters = Array(name, pluginClassName), e)
}
def cannotInstantiateAbstractCatalogPluginClassError(
name: String,
pluginClassName: String,
e: Exception): Throwable = {
- new SparkException("Cannot instantiate abstract catalog plugin class for "
+
- s"catalog '$name': $pluginClassName", e.getCause)
+ new SparkException(
+ errorClass = "CANNOT_INSTANTIATE_ABSTRACT_CATALOG_PLUGIN_CLASS_ERROR",
+ messageParameters = Array(name, pluginClassName), e.getCause)
}
def failedToInstantiateConstructorForCatalogError(
name: String,
pluginClassName: String,
e: Exception): Throwable = {
- new SparkException("Failed during instantiating constructor for catalog " +
- s"'$name': $pluginClassName", e.getCause)
+ new SparkException(
+ errorClass = "FAILED_TO_INSTANTIATE_CONSTRUCTOR_FOR_CATALOG_ERROR",
+ messageParameters = Array(name, pluginClassName), e.getCause)
}
def noSuchElementExceptionError(): Throwable = {
- new NoSuchElementException
+ new SparkNoSuchElementException(
+ errorClass = "NO_SUCH_ELEMENT_EXCEPTION_ERROR",
Review comment:
We can further simplify this error class to `NO_SUCH_ELEMENT`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]