This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new f5b0de07eff4 [SPARK-46991][SQL] Replace `IllegalArgumentException` by 
`SparkIllegalArgumentException` in `catalyst`
f5b0de07eff4 is described below

commit f5b0de07eff49bc1d076c4a1dc59c8672beff99e
Author: Max Gekk <max.g...@gmail.com>
AuthorDate: Sun Feb 11 15:25:28 2024 -0800

    [SPARK-46991][SQL] Replace `IllegalArgumentException` by 
`SparkIllegalArgumentException` in `catalyst`
    
    ### What changes were proposed in this pull request?
    In the PR, I propose to replace all `IllegalArgumentException` by 
`SparkIllegalArgumentException` in `Catalyst` code base, and introduce new 
legacy error classes with the `_LEGACY_ERROR_TEMP_` prefix.
    
    ### Why are the changes needed?
    To unify Spark SQL exception, and port Java exceptions on Spark exceptions 
with error classes.
    
    ### Does this PR introduce _any_ user-facing change?
    Yes, it can if user's code assumes some particular format of 
`IllegalArgumentException` messages.
    
    ### How was this patch tested?
    By running existing test suites like:
    ```
    $ build/sbt "core/testOnly *SparkThrowableSuite"
    $ build/sbt "test:testOnly *BufferHolderSparkSubmitSuite"
    ```
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #45033 from MaxGekk/migrate-IllegalArgumentException-catalyst.
    
    Authored-by: Max Gekk <max.g...@gmail.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../src/main/resources/error/error-classes.json    | 255 +++++++++++++++++++++
 .../scala/org/apache/spark/SparkException.scala    |  25 +-
 .../connect/client/GrpcExceptionConverter.scala    |   1 +
 .../sql/catalyst/expressions/ExpressionInfo.java   |  38 +--
 .../catalyst/expressions/codegen/BufferHolder.java |  12 +-
 .../sql/catalyst/expressions/xml/UDFXPathUtil.java |   4 +-
 .../catalog/SupportsPartitionManagement.java       |   5 +-
 .../sql/connector/util/V2ExpressionSQLBuilder.java |   4 +-
 .../spark/sql/util/CaseInsensitiveStringMap.java   |   3 +-
 .../sql/catalyst/CatalystTypeConverters.scala      |  75 ++++--
 .../spark/sql/catalyst/csv/CSVExprUtils.scala      |  18 +-
 .../spark/sql/catalyst/csv/CSVHeaderChecker.scala  |   5 +-
 .../spark/sql/catalyst/expressions/Cast.scala      |   6 +-
 .../sql/catalyst/expressions/TimeWindow.scala      |   6 +-
 .../expressions/codegen/CodeGenerator.scala        |   6 +-
 .../expressions/collectionOperations.scala         |  25 +-
 .../sql/catalyst/expressions/csvExpressions.scala  |   7 +-
 .../catalyst/expressions/datetimeExpressions.scala |  14 +-
 .../sql/catalyst/expressions/xmlExpressions.scala  |   7 +-
 .../ReplaceNullWithFalseInPredicate.scala          |  11 +-
 .../spark/sql/catalyst/parser/AstBuilder.scala     |   8 +-
 .../spark/sql/catalyst/plans/joinTypes.scala       |  16 +-
 .../sql/catalyst/plans/logical/v2Commands.scala    |   6 +-
 .../spark/sql/catalyst/util/DateTimeUtils.scala    |   8 +-
 .../spark/sql/catalyst/util/IntervalUtils.scala    |  39 ++--
 .../spark/sql/catalyst/xml/StaxXmlGenerator.scala  |   9 +-
 .../spark/sql/catalyst/xml/StaxXmlParser.scala     |  22 +-
 .../spark/sql/catalyst/xml/XmlInferSchema.scala    |   5 +-
 .../sql/connector/catalog/CatalogV2Util.scala      |  32 ++-
 .../spark/sql/errors/QueryExecutionErrors.scala    |  28 +--
 .../sql/catalyst/CatalystTypeConvertersSuite.scala |  74 +++---
 .../spark/sql/catalyst/csv/CSVExprUtilsSuite.scala |  42 ++--
 .../sql/catalyst/expressions/TimeWindowSuite.scala |  18 +-
 .../codegen/BufferHolderSparkSubmitSuite.scala     |  12 +-
 .../expressions/codegen/BufferHolderSuite.scala    |  22 +-
 .../sql/catalyst/util/DateTimeUtilsSuite.scala     |  24 +-
 .../sql/util/CaseInsensitiveStringMapSuite.scala   |  11 +-
 .../execution/datasources/v2/AlterTableExec.scala  |   3 +-
 .../resources/sql-tests/results/ansi/date.sql.out  |   5 +-
 .../sql/expressions/ExpressionInfoSuite.scala      |  84 ++++---
 40 files changed, 729 insertions(+), 266 deletions(-)

diff --git a/common/utils/src/main/resources/error/error-classes.json 
b/common/utils/src/main/resources/error/error-classes.json
index 4fcf9248d3e2..5884c9267119 100644
--- a/common/utils/src/main/resources/error/error-classes.json
+++ b/common/utils/src/main/resources/error/error-classes.json
@@ -7512,6 +7512,261 @@
       "Failed to create column family with reserved name=<colFamilyName>"
     ]
   },
+  "_LEGACY_ERROR_TEMP_3198" : {
+    "message" : [
+      "Cannot grow BufferHolder by size <neededSize> because the size is 
negative"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3199" : {
+    "message" : [
+      "Cannot grow BufferHolder by size <neededSize> because the size after 
growing exceeds size limitation <arrayMax>"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3200" : {
+    "message" : [
+      "Read-ahead limit < 0"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3201" : {
+    "message" : [
+      "'note' is malformed in the expression [<exprName>]. It should start 
with a newline and 4 leading spaces; end with a newline and two spaces; 
however, got [<note>]."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3202" : {
+    "message" : [
+      "'group' is malformed in the expression [<exprName>]. It should be a 
value in <validGroups>; however, got <group>."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3203" : {
+    "message" : [
+      "'source' is malformed in the expression [<exprName>]. It should be a 
value in <validSources>; however, got [<source>]."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3204" : {
+    "message" : [
+      "'since' is malformed in the expression [<exprName>]. It should not 
start with a negative number; however, got [<since>]."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3205" : {
+    "message" : [
+      "'deprecated' is malformed in the expression [<exprName>]. It should 
start with a newline and 4 leading spaces; end with a newline and two spaces; 
however, got [<deprecated>]."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3206" : {
+    "message" : [
+      "<value> is not a boolean string."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3207" : {
+    "message" : [
+      "Unexpected V2 expression: <expr>"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3208" : {
+    "message" : [
+      "The number of fields (<numFields>) in the partition identifier is not 
equal to the partition schema length (<schemaLen>). The identifier might not 
refer to one partition."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3209" : {
+    "message" : [
+      "Illegal input for day of week: <string>"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3210" : {
+    "message" : [
+      "Interval string does not match second-nano format of ss.nnnnnnnnn"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3211" : {
+    "message" : [
+      "Error parsing interval day-time string: <msg>"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3212" : {
+    "message" : [
+      "Cannot support (interval '<input>' <from> to <to>) expression"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3213" : {
+    "message" : [
+      "Error parsing interval <interval> string: <msg>"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3214" : {
+    "message" : [
+      "Interval string does not match <intervalStr> format of 
<supportedFormat> when cast to <typeName>: <input><fallBackNotice>"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3215" : {
+    "message" : [
+      "Expected a Boolean type expression in replaceNullWithFalse, but got the 
type <dataType> in <expr>."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3216" : {
+    "message" : [
+      "Unsupported join type '<typ>'. Supported join types include: 
<supported>."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3217" : {
+    "message" : [
+      "Unsupported as-of join direction '<direction>'. Supported as-of join 
direction include: <supported>."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3218" : {
+    "message" : [
+      "Must be 2 children: <others>"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3219" : {
+    "message" : [
+      "The value (<other>) of the type (<otherClass>) cannot be converted to 
the <dataType> type."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3220" : {
+    "message" : [
+      "The value (<other>) of the type (<otherClass>) cannot be converted to 
an array of <elementType>"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3221" : {
+    "message" : [
+      "The value (<other>) of the type (<otherClass>) cannot be converted to a 
map type with key type (<keyType>) and value type (<valueType>)"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3222" : {
+    "message" : [
+      "Only literals are allowed in the partition spec, but got <expr>"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3223" : {
+    "message" : [
+      "Cannot find field: <name> in <dataType>"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3224" : {
+    "message" : [
+      "Cannot delete array element"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3225" : {
+    "message" : [
+      "Cannot delete map value"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3226" : {
+    "message" : [
+      "Cannot delete map key"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3227" : {
+    "message" : [
+      "Cannot find field: <fieldName>"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3228" : {
+    "message" : [
+      "AFTER column not found: <afterCol>"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3229" : {
+    "message" : [
+      "Not a struct: <name>"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3230" : {
+    "message" : [
+      "Field not found: <name>"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3231" : {
+    "message" : [
+      "Intervals greater than a month is not supported (<interval>)."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3232" : {
+    "message" : [
+      "Unknown EvalMode value: <other>"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3233" : {
+    "message" : [
+      "cannot generate code for unsupported type: <dataType>"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3234" : {
+    "message" : [
+      "Unsupported input type <other>"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3235" : {
+    "message" : [
+      "The numbers of zipped arrays and field names should be the same"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3236" : {
+    "message" : [
+      "Unsupported special character for delimiter: <str>"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3237" : {
+    "message" : [
+      "Delimiter cannot be more than one character: <str>"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3238" : {
+    "message" : [
+      "Failed to convert value <v> (class of <class>) in type <dt> to XML."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3239" : {
+    "message" : [
+      "Failed to parse data with unexpected event <e>"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3240" : {
+    "message" : [
+      "Failed to parse a value for data type <dt> with event <e>"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3241" : {
+    "message" : [
+      "<msg>"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3242" : {
+    "message" : [
+      "sequence step must be an <intervalType> of day granularity if start and 
end values are dates"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3243" : {
+    "message" : [
+      "Illegal sequence boundaries: <start> to <stop> by <step>"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3244" : {
+    "message" : [
+      "Unsupported type: <castType>"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3245" : {
+    "message" : [
+      "For input string: <s>"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3246" : {
+    "message" : [
+      "Failed to parse a value for data type <dataType>."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3247" : {
+    "message" : [
+      "Delimiter cannot be empty string"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3248" : {
+    "message" : [
+      "Single backslash is prohibited. It has special meaning as beginning of 
an escape sequence. To get the backslash character, pass a string with two 
backslashes as the delimiter."
+    ]
+  },
   "_LEGACY_ERROR_USER_RAISED_EXCEPTION" : {
     "message" : [
       "<errorMessage>"
diff --git a/common/utils/src/main/scala/org/apache/spark/SparkException.scala 
b/common/utils/src/main/scala/org/apache/spark/SparkException.scala
index 899082e550f9..e73042d55819 100644
--- a/common/utils/src/main/scala/org/apache/spark/SparkException.scala
+++ b/common/utils/src/main/scala/org/apache/spark/SparkException.scala
@@ -397,9 +397,9 @@ private[spark] class SparkIllegalArgumentException private(
   def this(
     errorClass: String,
     messageParameters: Map[String, String],
-    context: Array[QueryContext] = Array.empty,
-    summary: String = "",
-    cause: Throwable = null) = {
+    context: Array[QueryContext],
+    summary: String,
+    cause: Throwable) = {
     this(
       SparkThrowableHelper.getMessage(errorClass, messageParameters, summary),
       Option(cause),
@@ -409,6 +409,25 @@ private[spark] class SparkIllegalArgumentException private(
     )
   }
 
+  def this(
+    errorClass: String,
+    messageParameters: Map[String, String],
+    cause: Throwable) =
+    this(errorClass, messageParameters, Array.empty[QueryContext], "", cause)
+
+  def this(
+    errorClass: String,
+    messageParameters: Map[String, String]) =
+    this(errorClass, messageParameters, cause = null)
+
+  def this(
+    errorClass: String,
+    messageParameters: java.util.Map[String, String]) =
+    this(errorClass, messageParameters.asScala.toMap)
+
+  def this(errorClass: String) =
+    this(errorClass, messageParameters = Map.empty[String, String], cause = 
null)
+
   override def getMessageParameters: java.util.Map[String, String] = 
messageParameters.asJava
 
   override def getErrorClass: String = errorClass.orNull
diff --git 
a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala
 
b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala
index 8abfd06d4a3f..3247bc643802 100644
--- 
a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala
+++ 
b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala
@@ -231,6 +231,7 @@ private[client] object GrpcExceptionConverter {
         errorClass = params.errorClass.getOrElse("_LEGACY_ERROR_TEMP_3105"),
         messageParameters = errorParamsToMessageParameters(params),
         params.queryContext,
+        summary = "",
         cause = params.cause.orNull)),
     errorConstructor[ArithmeticException](params =>
       new SparkArithmeticException(
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/ExpressionInfo.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/ExpressionInfo.java
index ffc3c8eaf8f8..3683507d6f72 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/ExpressionInfo.java
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/ExpressionInfo.java
@@ -21,10 +21,13 @@ import com.google.common.annotations.VisibleForTesting;
 
 import java.util.Arrays;
 import java.util.HashSet;
+import java.util.Map;
 import java.util.Set;
 
+import org.apache.spark.SparkIllegalArgumentException;
+
 /**
- * Expression information, will be used to describe a expression.
+ * Expression information, will be used to describe an expression.
  */
 public class ExpressionInfo {
     private String className;
@@ -145,36 +148,37 @@ public class ExpressionInfo {
         }
         if (!note.isEmpty()) {
             if (!note.contains("    ") || !note.endsWith("  ")) {
-                throw new IllegalArgumentException("'note' is malformed in the 
expression [" +
-                    this.name + "]. It should start with a newline and 4 
leading spaces; end " +
-                    "with a newline and two spaces; however, got [" + note + 
"].");
+                throw new SparkIllegalArgumentException(
+                  "_LEGACY_ERROR_TEMP_3201", Map.of("exprName", this.name, 
"note", note));
             }
             this.extended += "\n    Note:\n      " + note.trim() + "\n";
         }
         if (!group.isEmpty() && !validGroups.contains(group)) {
-            throw new IllegalArgumentException("'group' is malformed in the 
expression [" +
-                this.name + "]. It should be a value in " + validGroups + "; 
however, " +
-                "got [" + group + "].");
+            throw new SparkIllegalArgumentException(
+              "_LEGACY_ERROR_TEMP_3202",
+              Map.of("exprName", this.name,
+                "validGroups", 
String.valueOf(validGroups.stream().sorted().toList()),
+                "group", group));
         }
         if (!source.isEmpty() && !validSources.contains(source)) {
-            throw new IllegalArgumentException("'source' is malformed in the 
expression [" +
-                    this.name + "]. It should be a value in " + validSources + 
"; however, " +
-                    "got [" + source + "].");
+            throw new SparkIllegalArgumentException(
+              "_LEGACY_ERROR_TEMP_3203",
+              Map.of("exprName", this.name,
+                "validSources", 
String.valueOf(validSources.stream().sorted().toList()),
+                "source", source));
         }
         if (!since.isEmpty()) {
             if (Integer.parseInt(since.split("\\.")[0]) < 0) {
-                throw new IllegalArgumentException("'since' is malformed in 
the expression [" +
-                    this.name + "]. It should not start with a negative 
number; however, " +
-                    "got [" + since + "].");
+                throw new SparkIllegalArgumentException(
+                  "_LEGACY_ERROR_TEMP_3204", Map.of("exprName", this.name, 
"since", since));
             }
             this.extended += "\n    Since: " + since + "\n";
         }
         if (!deprecated.isEmpty()) {
             if (!deprecated.contains("    ") || !deprecated.endsWith("  ")) {
-                throw new IllegalArgumentException("'deprecated' is malformed 
in the " +
-                    "expression [" + this.name + "]. It should start with a 
newline and 4 " +
-                    "leading spaces; end with a newline and two spaces; 
however, got [" +
-                    deprecated + "].");
+                throw new SparkIllegalArgumentException(
+                  "_LEGACY_ERROR_TEMP_3205",
+                  Map.of("exprName", this.name, "deprecated", deprecated));
             }
             this.extended += "\n    Deprecated:\n      " + deprecated.trim() + 
"\n";
         }
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolder.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolder.java
index 4b11f2da8b22..c3a3cbdae8d5 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolder.java
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolder.java
@@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.expressions.codegen;
 
 import java.util.Map;
 
+import org.apache.spark.SparkIllegalArgumentException;
 import org.apache.spark.SparkUnsupportedOperationException;
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
 import org.apache.spark.unsafe.Platform;
@@ -67,13 +68,14 @@ final class BufferHolder {
    */
   void grow(int neededSize) {
     if (neededSize < 0) {
-      throw new IllegalArgumentException(
-        "Cannot grow BufferHolder by size " + neededSize + " because the size 
is negative");
+      throw new SparkIllegalArgumentException(
+        "_LEGACY_ERROR_TEMP_3198",
+        Map.of("neededSize", String.valueOf(neededSize)));
     }
     if (neededSize > ARRAY_MAX - totalSize()) {
-      throw new IllegalArgumentException(
-        "Cannot grow BufferHolder by size " + neededSize + " because the size 
after growing " +
-          "exceeds size limitation " + ARRAY_MAX);
+      throw new SparkIllegalArgumentException(
+        "_LEGACY_ERROR_TEMP_3199",
+        Map.of("neededSize", String.valueOf(neededSize), "arrayMax", 
String.valueOf(ARRAY_MAX)));
     }
     final int length = totalSize() + neededSize;
     if (buffer.length < length) {
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/xml/UDFXPathUtil.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/xml/UDFXPathUtil.java
index e9f18229b54c..3e94b332160c 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/xml/UDFXPathUtil.java
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/xml/UDFXPathUtil.java
@@ -34,6 +34,8 @@ import org.w3c.dom.Node;
 import org.w3c.dom.NodeList;
 import org.xml.sax.InputSource;
 
+import org.apache.spark.SparkIllegalArgumentException;
+
 /**
  * Utility class for all XPath UDFs. Each UDF instance should keep an instance 
of this class.
  *
@@ -200,7 +202,7 @@ public class UDFXPathUtil {
     @Override
     public void mark(int readAheadLimit) throws IOException {
       if (readAheadLimit < 0) {
-        throw new IllegalArgumentException("Read-ahead limit < 0");
+        throw new SparkIllegalArgumentException("_LEGACY_ERROR_TEMP_3200");
       }
       ensureOpen();
       mark = next;
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsPartitionManagement.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsPartitionManagement.java
index 48cf2a0aee1e..3c130a0aa87e 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsPartitionManagement.java
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsPartitionManagement.java
@@ -104,9 +104,8 @@ public interface SupportsPartitionManagement extends Table {
       if (ident.numFields() == partitionNames.length) {
         return listPartitionIdentifiers(partitionNames, ident).length > 0;
       } else {
-        throw new IllegalArgumentException("The number of fields (" + 
ident.numFields() +
-          ") in the partition identifier is not equal to the partition schema 
length (" +
-          partitionNames.length + "). The identifier might not refer to one 
partition.");
+        throw QueryExecutionErrors.partitionNumMismatchError(
+          ident.numFields(), partitionNames.length);
       }
     }
 
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/util/V2ExpressionSQLBuilder.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/util/V2ExpressionSQLBuilder.java
index 1035d2da0240..fd1b8f5dd1ee 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/util/V2ExpressionSQLBuilder.java
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/util/V2ExpressionSQLBuilder.java
@@ -22,6 +22,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.StringJoiner;
 
+import org.apache.spark.SparkIllegalArgumentException;
 import org.apache.spark.SparkUnsupportedOperationException;
 import org.apache.spark.sql.connector.expressions.Cast;
 import org.apache.spark.sql.connector.expressions.Expression;
@@ -305,7 +306,8 @@ public class V2ExpressionSQLBuilder {
   }
 
   protected String visitUnexpectedExpr(Expression expr) throws 
IllegalArgumentException {
-    throw new IllegalArgumentException("Unexpected V2 expression: " + expr);
+    throw new SparkIllegalArgumentException(
+      "_LEGACY_ERROR_TEMP_3207", Map.of("expr", String.valueOf(expr)));
   }
 
   protected String visitOverlay(String[] inputs) {
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/util/CaseInsensitiveStringMap.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/util/CaseInsensitiveStringMap.java
index 5e0c2f993a41..1c5c38ba705f 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/util/CaseInsensitiveStringMap.java
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/util/CaseInsensitiveStringMap.java
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.util;
 
+import org.apache.spark.SparkIllegalArgumentException;
 import org.apache.spark.SparkUnsupportedOperationException;
 import org.apache.spark.annotation.Experimental;
 import org.slf4j.Logger;
@@ -145,7 +146,7 @@ public class CaseInsensitiveStringMap implements 
Map<String, String> {
     } else if (value.equalsIgnoreCase("false")) {
       return false;
     } else {
-      throw new IllegalArgumentException(value + " is not a boolean string.");
+      throw new SparkIllegalArgumentException("_LEGACY_ERROR_TEMP_3206", 
Map.of("value", value));
     }
   }
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
index 7603a002d640..32e0c5884ebe 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
@@ -27,6 +27,7 @@ import javax.annotation.Nullable
 
 import scala.language.existentials
 
+import org.apache.spark.SparkIllegalArgumentException
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.util._
@@ -174,9 +175,12 @@ object CatalystTypeConverters {
             convertedIterable += elementConverter.toCatalyst(item)
           }
           new GenericArrayData(convertedIterable.toArray)
-        case other => throw new IllegalArgumentException(
-          s"The value (${other.toString}) of the type 
(${other.getClass.getCanonicalName}) "
-            + s"cannot be converted to an array of 
${elementType.catalogString}")
+        case other => throw new SparkIllegalArgumentException(
+          errorClass = "_LEGACY_ERROR_TEMP_3220",
+          messageParameters = scala.collection.immutable.Map(
+            "other" -> other.toString,
+            "otherClass" -> other.getClass.getCanonicalName,
+            "elementType" -> elementType.catalogString))
       }
     }
 
@@ -213,10 +217,13 @@ object CatalystTypeConverters {
       scalaValue match {
         case map: Map[_, _] => ArrayBasedMapData(map, keyFunction, 
valueFunction)
         case javaMap: JavaMap[_, _] => ArrayBasedMapData(javaMap, keyFunction, 
valueFunction)
-        case other => throw new IllegalArgumentException(
-          s"The value (${other.toString}) of the type 
(${other.getClass.getCanonicalName}) "
-            + "cannot be converted to a map type with "
-            + s"key type (${keyType.catalogString}) and value type 
(${valueType.catalogString})")
+        case other => throw new SparkIllegalArgumentException(
+          errorClass = "_LEGACY_ERROR_TEMP_3221",
+          messageParameters = scala.collection.immutable.Map(
+            "other" -> other.toString,
+            "otherClass" -> other.getClass.getCanonicalName,
+            "keyType" -> keyType.catalogString,
+            "valueType" -> valueType.catalogString))
       }
     }
 
@@ -263,9 +270,12 @@ object CatalystTypeConverters {
           idx += 1
         }
         new GenericInternalRow(ar)
-      case other => throw new IllegalArgumentException(
-        s"The value (${other.toString}) of the type 
(${other.getClass.getCanonicalName}) "
-          + s"cannot be converted to ${structType.catalogString}")
+      case other => throw new SparkIllegalArgumentException(
+        errorClass = "_LEGACY_ERROR_TEMP_3219",
+        messageParameters = scala.collection.immutable.Map(
+          "other" -> other.toString,
+          "otherClass" -> other.getClass.getCanonicalName,
+          "dataType" -> structType.catalogString))
     }
 
     override def toScala(row: InternalRow): Row = {
@@ -292,9 +302,12 @@ object CatalystTypeConverters {
       case utf8: UTF8String => utf8
       case chr: Char => UTF8String.fromString(chr.toString)
       case ac: Array[Char] => UTF8String.fromString(String.valueOf(ac))
-      case other => throw new IllegalArgumentException(
-        s"The value (${other.toString}) of the type 
(${other.getClass.getCanonicalName}) "
-          + s"cannot be converted to the string type")
+      case other => throw new SparkIllegalArgumentException(
+        errorClass = "_LEGACY_ERROR_TEMP_3219",
+        messageParameters = scala.collection.immutable.Map(
+          "other" -> other.toString,
+          "otherClass" -> other.getClass.getCanonicalName,
+          "dataType" -> StringType.sql))
     }
     override def toScala(catalystValue: UTF8String): String =
       if (catalystValue == null) null else catalystValue.toString
@@ -306,9 +319,12 @@ object CatalystTypeConverters {
     override def toCatalystImpl(scalaValue: Any): Int = scalaValue match {
       case d: Date => DateTimeUtils.fromJavaDate(d)
       case l: LocalDate => DateTimeUtils.localDateToDays(l)
-      case other => throw new IllegalArgumentException(
-        s"The value (${other.toString}) of the type 
(${other.getClass.getCanonicalName}) "
-          + s"cannot be converted to the ${DateType.sql} type")
+      case other => throw new SparkIllegalArgumentException(
+        errorClass = "_LEGACY_ERROR_TEMP_3219",
+        messageParameters = scala.collection.immutable.Map(
+          "other" -> other.toString,
+          "otherClass" -> other.getClass.getCanonicalName,
+          "dataType" -> DateType.sql))
     }
     override def toScala(catalystValue: Any): Date =
       if (catalystValue == null) null else 
DateTimeUtils.toJavaDate(catalystValue.asInstanceOf[Int])
@@ -331,9 +347,12 @@ object CatalystTypeConverters {
     override def toCatalystImpl(scalaValue: Any): Long = scalaValue match {
       case t: Timestamp => DateTimeUtils.fromJavaTimestamp(t)
       case i: Instant => DateTimeUtils.instantToMicros(i)
-      case other => throw new IllegalArgumentException(
-        s"The value (${other.toString}) of the type 
(${other.getClass.getCanonicalName}) "
-          + s"cannot be converted to the ${TimestampType.sql} type")
+      case other => throw new SparkIllegalArgumentException(
+        errorClass = "_LEGACY_ERROR_TEMP_3219",
+        messageParameters = scala.collection.immutable.Map(
+          "other" -> other.toString,
+          "otherClass" -> other.getClass.getCanonicalName,
+          "dataType" -> TimestampType.sql))
     }
     override def toScala(catalystValue: Any): Timestamp =
       if (catalystValue == null) null
@@ -356,9 +375,12 @@ object CatalystTypeConverters {
     extends CatalystTypeConverter[Any, LocalDateTime, Any] {
     override def toCatalystImpl(scalaValue: Any): Any = scalaValue match {
       case l: LocalDateTime => DateTimeUtils.localDateTimeToMicros(l)
-      case other => throw new IllegalArgumentException(
-        s"The value (${other.toString}) of the type 
(${other.getClass.getCanonicalName}) "
-          + s"cannot be converted to the ${TimestampNTZType.sql} type")
+      case other => throw new SparkIllegalArgumentException(
+        errorClass = "_LEGACY_ERROR_TEMP_3219",
+        messageParameters = scala.collection.immutable.Map(
+          "other" -> other.toString,
+          "otherClass" -> other.getClass.getCanonicalName,
+          "dataType" -> TimestampNTZType.sql))
     }
 
     override def toScala(catalystValue: Any): LocalDateTime =
@@ -380,9 +402,12 @@ object CatalystTypeConverters {
         case d: JavaBigDecimal => Decimal(d)
         case d: JavaBigInteger => Decimal(d)
         case d: Decimal => d
-        case other => throw new IllegalArgumentException(
-          s"The value (${other.toString}) of the type 
(${other.getClass.getCanonicalName}) "
-            + s"cannot be converted to ${dataType.catalogString}")
+        case other => throw new SparkIllegalArgumentException(
+          errorClass = "_LEGACY_ERROR_TEMP_3219",
+          messageParameters = scala.collection.immutable.Map(
+            "other" -> other.toString,
+            "otherClass" -> other.getClass.getCanonicalName,
+            "dataType" -> dataType.catalogString))
       }
       decimal.toPrecision(dataType.precision, dataType.scale, 
Decimal.ROUND_HALF_UP, nullOnOverflow)
     }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVExprUtils.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVExprUtils.scala
index b800caccbf85..60113d6b3e12 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVExprUtils.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVExprUtils.scala
@@ -19,6 +19,8 @@ package org.apache.spark.sql.catalyst.csv
 
 import org.apache.commons.lang3.StringUtils
 
+import org.apache.spark.SparkIllegalArgumentException
+
 object CSVExprUtils {
   /**
    * Filter ignorable rows for CSV iterator (lines empty and starting with 
`comment`).
@@ -63,13 +65,11 @@ object CSVExprUtils {
    * It handles some Java escaped strings and throws exception if given string 
is longer than one
    * character.
    */
-  @throws[IllegalArgumentException]
+  @throws[SparkIllegalArgumentException]
   def toChar(str: String): Char = {
     (str: Seq[Char]) match {
-      case Seq() => throw new IllegalArgumentException("Delimiter cannot be 
empty string")
-      case Seq('\\') => throw new IllegalArgumentException("Single backslash 
is prohibited." +
-        " It has special meaning as beginning of an escape sequence." +
-        " To get the backslash character, pass a string with two backslashes 
as the delimiter.")
+      case Seq() => throw new 
SparkIllegalArgumentException("_LEGACY_ERROR_TEMP_3247")
+      case Seq('\\') => throw new 
SparkIllegalArgumentException("_LEGACY_ERROR_TEMP_3248")
       case Seq(c) => c
       case Seq('\\', 't') => '\t'
       case Seq('\\', 'r') => '\r'
@@ -81,9 +81,11 @@ object CSVExprUtils {
       case Seq('\\', '\\') => '\\'
       case _ if str == "\u0000" => '\u0000'
       case Seq('\\', _) =>
-        throw new IllegalArgumentException(s"Unsupported special character for 
delimiter: $str")
+        throw new SparkIllegalArgumentException(
+          errorClass = "_LEGACY_ERROR_TEMP_3236", messageParameters = 
Map("str" -> str))
       case _ =>
-        throw new IllegalArgumentException(s"Delimiter cannot be more than one 
character: $str")
+        throw new SparkIllegalArgumentException(
+          errorClass = "_LEGACY_ERROR_TEMP_3237", messageParameters = 
Map("str" -> str))
     }
   }
 
@@ -110,7 +112,7 @@ object CSVExprUtils {
    *
    * @param str the string representing the sequence of separator characters
    * @return a [[String]] representing the multi-character delimiter
-   * @throws IllegalArgumentException if any of the individual input chunks 
are illegal
+   * @throws SparkIllegalArgumentException if any of the individual input 
chunks are illegal
    */
   def toDelimiterStr(str: String): String = {
     var idx = 0
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVHeaderChecker.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVHeaderChecker.scala
index 9366c62792c8..43336dbfa65c 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVHeaderChecker.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVHeaderChecker.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.csv
 
 import com.univocity.parsers.csv.CsvParser
 
+import org.apache.spark.SparkIllegalArgumentException
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.StructType
@@ -92,7 +93,9 @@ class CSVHeaderChecker(
         if (enforceSchema) {
           logWarning(msg)
         } else {
-          throw new IllegalArgumentException(msg)
+          throw new SparkIllegalArgumentException(
+            errorClass = "_LEGACY_ERROR_TEMP_3241",
+            messageParameters = Map("msg" -> msg))
         }
       }
     }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
index f82f22772369..eae112a6a398 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
@@ -21,7 +21,7 @@ import java.time.{ZoneId, ZoneOffset}
 import java.util.Locale
 import java.util.concurrent.TimeUnit._
 
-import org.apache.spark.{QueryContext, SparkArithmeticException}
+import org.apache.spark.{QueryContext, SparkArithmeticException, 
SparkIllegalArgumentException}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
 import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.DataTypeMismatch
@@ -507,7 +507,9 @@ case class Cast(
       case EvalMode.LEGACY => Cast.canCast(child.dataType, dataType)
       case EvalMode.ANSI => Cast.canAnsiCast(child.dataType, dataType)
       case EvalMode.TRY => Cast.canTryCast(child.dataType, dataType)
-      case other => throw new IllegalArgumentException(s"Unknown EvalMode 
value: $other")
+      case other => throw new SparkIllegalArgumentException(
+        errorClass = "_LEGACY_ERROR_TEMP_3232",
+        messageParameters = Map("other" -> other.toString))
     }
     if (canCast) {
       TypeCheckResult.TypeCheckSuccess
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimeWindow.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimeWindow.scala
index bc9b7de7464e..fd2e302deb99 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimeWindow.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimeWindow.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.catalyst.expressions
 
+import org.apache.spark.SparkIllegalArgumentException
 import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
 import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.DataTypeMismatch
 import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, 
CodeGenerator, ExprCode}
@@ -199,8 +200,9 @@ object TimeWindow {
   def getIntervalInMicroSeconds(interval: String): Long = {
     val cal = IntervalUtils.fromIntervalString(interval)
     if (cal.months != 0) {
-      throw new IllegalArgumentException(
-        s"Intervals greater than a month is not supported ($interval).")
+      throw new SparkIllegalArgumentException(
+        errorClass = "_LEGACY_ERROR_TEMP_3231",
+        messageParameters = Map("interval" -> interval))
     }
     Math.addExact(Math.multiplyExact(cal.days, MICROS_PER_DAY), 
cal.microseconds)
   }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
index 5d04b0d6d95a..4c1a86292d70 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
@@ -30,7 +30,7 @@ import org.codehaus.commons.compiler.{CompileException, 
InternalCompilerExceptio
 import org.codehaus.janino.ClassBodyEvaluator
 import org.codehaus.janino.util.ClassFile
 
-import org.apache.spark.{SparkException, TaskContext, TaskKilledException}
+import org.apache.spark.{SparkException, SparkIllegalArgumentException, 
TaskContext, TaskKilledException}
 import org.apache.spark.executor.InputMetrics
 import org.apache.spark.internal.Logging
 import org.apache.spark.metrics.source.CodegenMetrics
@@ -1771,7 +1771,9 @@ object CodeGenerator extends Logging {
       case CalendarIntervalType => s"$vector.putInterval($rowId, $value);"
       case t: StringType => s"$vector.putByteArray($rowId, $value.getBytes());"
       case _ =>
-        throw new IllegalArgumentException(s"cannot generate code for 
unsupported type: $dataType")
+        throw new SparkIllegalArgumentException(
+          errorClass = "_LEGACY_ERROR_TEMP_3233",
+          messageParameters = Map("dataType" -> dataType.toString))
     }
   }
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
index 5aa96dd1a6aa..92dd79fd59e0 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
@@ -22,7 +22,7 @@ import java.util.Comparator
 import scala.collection.mutable
 import scala.reflect.ClassTag
 
-import org.apache.spark.{QueryContext, SparkException}
+import org.apache.spark.{QueryContext, SparkException, 
SparkIllegalArgumentException}
 import org.apache.spark.SparkException.internalError
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.{TypeCheckResult, TypeCoercion, 
UnresolvedAttribute, UnresolvedSeed}
@@ -302,8 +302,7 @@ case class ArraysZip(children: Seq[Expression], names: 
Seq[Expression])
   }
 
   if (children.size != names.size) {
-    throw new IllegalArgumentException(
-      "The numbers of zipped arrays and field names should be the same")
+    throw new SparkIllegalArgumentException("_LEGACY_ERROR_TEMP_3235")
   }
 
   final override val nodePatterns: Seq[TreePattern] = Seq(ARRAYS_ZIP)
@@ -3353,8 +3352,9 @@ object Sequence {
       val (stepMonths, stepDays, stepMicros) = splitStep(input3)
 
       if (scale == MICROS_PER_DAY && stepMonths == 0 && stepDays == 0) {
-        throw new IllegalArgumentException(s"sequence step must be an 
${intervalType.typeName}" +
-          " of day granularity if start and end values are dates")
+        throw new SparkIllegalArgumentException(
+          errorClass = "_LEGACY_ERROR_TEMP_3242",
+          messageParameters = Map("intervalType" -> intervalType.typeName))
       }
 
       if (stepMonths == 0 && stepMicros == 0 && scale == MICROS_PER_DAY) {
@@ -3446,9 +3446,10 @@ object Sequence {
       val check = if (scale == MICROS_PER_DAY) {
         s"""
            |if ($stepMonths == 0 && $stepDays == 0) {
-           |  throw new IllegalArgumentException(
-           |    "sequence step must be an ${intervalType.typeName} " +
-           |    "of day granularity if start and end values are dates");
+           |  java.util.Map<String, String> params = new 
java.util.HashMap<String, String>();
+           |  params.put("intervalType", "${intervalType.typeName}");
+           |  throw new org.apache.spark.SparkIllegalArgumentException(
+           |    "_LEGACY_ERROR_TEMP_3242", params);
            |}
          """.stripMargin
         } else {
@@ -3539,8 +3540,12 @@ object Sequence {
        |if (!(($estimatedStep > 0 && $start <= $stop) ||
        |  ($estimatedStep < 0 && $start >= $stop) ||
        |  ($estimatedStep == 0 && $start == $stop))) {
-       |  throw new IllegalArgumentException(
-       |    "Illegal sequence boundaries: " + $start + " to " + $stop + " by " 
+ $step);
+       |  java.util.Map<String, String> params = new java.util.HashMap<String, 
String>();
+       |  params.put("start", $start);
+       |  params.put("stop", $stop);
+       |  params.put("step", $step);
+       |  throw new org.apache.spark.SparkIllegalArgumentException(
+       |    "_LEGACY_ERROR_TEMP_3243", params);
        |}
        |int $len = $calcFn((long) $start, (long) $stop, (long) $estimatedStep);
        """.stripMargin
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala
index a7c9e2946d7b..a2d17617a10f 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala
@@ -21,7 +21,7 @@ import java.io.CharArrayWriter
 
 import com.univocity.parsers.csv.CsvParser
 
-import org.apache.spark.SparkException
+import org.apache.spark.{SparkException, SparkIllegalArgumentException}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
 import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.DataTypeMismatch
@@ -266,8 +266,9 @@ case class StructsToCsv(
   @transient
   lazy val inputSchema: StructType = child.dataType match {
     case st: StructType => st
-    case other =>
-      throw new IllegalArgumentException(s"Unsupported input type 
${other.catalogString}")
+    case other => throw new SparkIllegalArgumentException(
+      errorClass = "_LEGACY_ERROR_TEMP_3234",
+      messageParameters = Map("other" -> other.catalogString))
   }
 
   @transient
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
index 997d2d7420a3..a9155e8daf10 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
@@ -24,7 +24,7 @@ import java.util.Locale
 
 import org.apache.commons.text.StringEscapeUtils
 
-import org.apache.spark.SparkDateTimeException
+import org.apache.spark.{SparkDateTimeException, SparkIllegalArgumentException}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.{ExpressionBuilder, 
FunctionRegistry}
 import org.apache.spark.sql.catalyst.expressions.codegen._
@@ -1519,7 +1519,7 @@ case class LastDay(startDate: Expression)
     """_FUNC_(start_date, day_of_week) - Returns the first date which is later 
than `start_date` and named as indicated.
       The function returns NULL if at least one of the input parameters is 
NULL.
       When both of the input parameters are not NULL and day_of_week is an 
invalid input,
-      the function throws IllegalArgumentException if `spark.sql.ansi.enabled` 
is set to true, otherwise NULL.
+      the function throws SparkIllegalArgumentException if 
`spark.sql.ansi.enabled` is set to true, otherwise NULL.
       """,
   examples = """
     Examples:
@@ -1551,9 +1551,9 @@ case class NextDay(
       val sd = start.asInstanceOf[Int]
       DateTimeUtils.getNextDateForDayOfWeek(sd, dow)
     } catch {
-      case e: IllegalArgumentException =>
+      case e: SparkIllegalArgumentException =>
         if (failOnError) {
-          throw QueryExecutionErrors.ansiIllegalArgumentError(e)
+          throw e
         } else {
           null
         }
@@ -1568,7 +1568,7 @@ case class NextDay(
       sd: String,
       dowS: String): String = {
     val failOnErrorBranch = if (failOnError) {
-      "throw QueryExecutionErrors.ansiIllegalArgumentError(e);"
+      "throw e;"
     } else {
       s"${ev.isNull} = true;"
     }
@@ -1576,7 +1576,7 @@ case class NextDay(
      |try {
      |  int $dayOfWeekTerm = $dateTimeUtilClass.getDayOfWeekFromString($dowS);
      |  ${ev.value} = $dateTimeUtilClass.getNextDateForDayOfWeek($sd, 
$dayOfWeekTerm);
-     |} catch (IllegalArgumentException e) {
+     |} catch (org.apache.spark.SparkIllegalArgumentException e) {
      |  $failOnErrorBranch
      |}
      |""".stripMargin
@@ -1594,7 +1594,7 @@ case class NextDay(
             val dayOfWeekValue = DateTimeUtils.getDayOfWeekFromString(input)
             s"${ev.value} = $dateTimeUtilClass.getNextDateForDayOfWeek($sd, 
$dayOfWeekValue);"
           } catch {
-            case _: IllegalArgumentException => nextDayGenCode(ev, 
dayOfWeekTerm, sd, dowS)
+            case _: SparkIllegalArgumentException => nextDayGenCode(ev, 
dayOfWeekTerm, sd, dowS)
           }
         }
       } else {
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/xmlExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/xmlExpressions.scala
index 27c0a09fa1f0..800515ca84b5 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/xmlExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/xmlExpressions.scala
@@ -18,10 +18,10 @@ package org.apache.spark.sql.catalyst.expressions
 
 import java.io.CharArrayWriter
 
+import org.apache.spark.SparkIllegalArgumentException
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
 import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.DataTypeMismatch
-import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, 
Expression, ExpressionDescription, ExprUtils, NullIntolerant, 
TimeZoneAwareExpression, UnaryExpression}
 import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
 import org.apache.spark.sql.catalyst.util.{ArrayData, FailFastMode, 
FailureSafeParser, GenericArrayData, PermissiveMode}
 import org.apache.spark.sql.catalyst.xml.{StaxXmlGenerator, StaxXmlParser, 
ValidatorUtil, XmlInferSchema, XmlOptions}
@@ -283,8 +283,9 @@ case class StructsToXml(
   @transient
   lazy val inputSchema: StructType = child.dataType match {
     case st: StructType => st
-    case other =>
-      throw new IllegalArgumentException(s"Unsupported input type 
${other.catalogString}")
+    case other => throw new SparkIllegalArgumentException(
+      errorClass = "_LEGACY_ERROR_TEMP_3234",
+      messageParameters = Map("other" -> other.catalogString))
   }
 
   @transient
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceNullWithFalseInPredicate.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceNullWithFalseInPredicate.scala
index c8fe6c1524b2..73972cae82de 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceNullWithFalseInPredicate.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceNullWithFalseInPredicate.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.catalyst.optimizer
 
+import org.apache.spark.SparkIllegalArgumentException
 import org.apache.spark.sql.catalyst.expressions.{And, ArrayExists, 
ArrayFilter, CaseWhen, EqualNullSafe, Expression, If, In, InSet, 
LambdaFunction, Literal, MapFilter, Not, Or}
 import org.apache.spark.sql.catalyst.expressions.Literal.{FalseLiteral, 
TrueLiteral}
 import org.apache.spark.sql.catalyst.plans.logical.{DeleteAction, 
DeleteFromTable, Filter, InsertAction, InsertStarAction, Join, LogicalPlan, 
MergeAction, MergeIntoTable, ReplaceData, UpdateAction, UpdateStarAction, 
UpdateTable, WriteDelta}
@@ -130,11 +131,15 @@ object ReplaceNullWithFalseInPredicate extends 
Rule[LogicalPlan] {
     case e if e.dataType == BooleanType =>
       e
     case e =>
-      val message = "Expected a Boolean type expression in 
replaceNullWithFalse, " +
-        s"but got the type `${e.dataType.catalogString}` in `${e.sql}`."
       if (Utils.isTesting) {
-        throw new IllegalArgumentException(message)
+        throw new SparkIllegalArgumentException(
+          errorClass = "_LEGACY_ERROR_TEMP_3215",
+          messageParameters = Map(
+            "dataType" -> e.dataType.catalogString,
+            "expr" -> e.sql))
       } else {
+        val message = "Expected a Boolean type expression in 
replaceNullWithFalse, " +
+          s"but got the type `${e.dataType.catalogString}` in `${e.sql}`."
         logWarning(message)
         e
       }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index 6ab2394777a2..95bfba191d92 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -30,7 +30,7 @@ import org.antlr.v4.runtime.tree.{ParseTree, RuleNode, 
TerminalNode}
 import org.apache.commons.codec.DecoderException
 import org.apache.commons.codec.binary.Hex
 
-import org.apache.spark.{SparkArithmeticException, SparkException}
+import org.apache.spark.{SparkArithmeticException, SparkException, 
SparkIllegalArgumentException}
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.{FunctionIdentifier, SQLConfHelper, 
TableIdentifier}
 import org.apache.spark.sql.catalyst.analysis._
@@ -674,8 +674,10 @@ class AstBuilder extends DataTypeAstBuilder with 
SQLConfHelper with Logging {
           Cast(l, StringType, Some(conf.sessionLocalTimeZone)).eval().toString
         }
       case other =>
-        throw new IllegalArgumentException(s"Only literals are allowed in the 
" +
-          s"partition spec, but got ${other.sql}")
+        throw new SparkIllegalArgumentException(
+          errorClass = "_LEGACY_ERROR_TEMP_3222",
+          messageParameters = Map("expr" -> other.sql)
+        )
     }
   }
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/joinTypes.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/joinTypes.scala
index 3744613dacc0..f123258683ec 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/joinTypes.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/joinTypes.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.plans
 
 import java.util.Locale
 
-import org.apache.spark.SparkUnsupportedOperationException
+import org.apache.spark.{SparkIllegalArgumentException, 
SparkUnsupportedOperationException}
 import org.apache.spark.sql.catalyst.expressions.Attribute
 
 object JoinType {
@@ -41,8 +41,11 @@ object JoinType {
         "leftanti", "left_anti", "anti",
         "cross")
 
-      throw new IllegalArgumentException(s"Unsupported join type '$typ'. " +
-        "Supported join types include: " + supported.mkString("'", "', '", 
"'") + ".")
+      throw new SparkIllegalArgumentException(
+        errorClass = "_LEGACY_ERROR_TEMP_3216",
+        messageParameters = Map(
+          "typ" -> typ,
+          "supported" -> supported.mkString("'", "', '", "'")))
   }
 }
 
@@ -133,8 +136,11 @@ object AsOfJoinDirection {
       case "nearest" => Nearest
       case _ =>
         val supported = Seq("forward", "backward", "nearest")
-        throw new IllegalArgumentException(s"Unsupported as-of join direction 
'$direction'. " +
-          "Supported as-of join direction include: " + supported.mkString("'", 
"', '", "'") + ".")
+        throw new SparkIllegalArgumentException(
+          errorClass = "_LEGACY_ERROR_TEMP_3217",
+          messageParameters = Map(
+            "direction" -> direction,
+            "supported" -> supported.mkString("'", "', '", "'")))
     }
   }
 }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
index 12ac83b64b78..37e751ea9884 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.catalyst.plans.logical
 
-import org.apache.spark.SparkUnsupportedOperationException
+import org.apache.spark.{SparkIllegalArgumentException, 
SparkUnsupportedOperationException}
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.analysis.{AnalysisContext, 
AssignmentUtils, EliminateSubqueryAliases, FieldName, NamedRelation, 
PartitionSpec, ResolvedIdentifier, UnresolvedException}
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
@@ -442,7 +442,9 @@ trait V2CreateTableAsSelectPlan
       case Seq(newName, newQuery) =>
         withNameAndQuery(newName, newQuery)
       case others =>
-        throw new IllegalArgumentException("Must be 2 children: " + others)
+        throw new SparkIllegalArgumentException(
+          errorClass = "_LEGACY_ERROR_TEMP_3218",
+          messageParameters = Map("others" -> others.toString()))
     }
   }
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
index 01503324048b..e31ccdb81825 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
@@ -25,7 +25,7 @@ import java.util.concurrent.TimeUnit._
 
 import scala.util.control.NonFatal
 
-import org.apache.spark.{QueryContext, SparkException}
+import org.apache.spark.{QueryContext, SparkException, 
SparkIllegalArgumentException}
 import org.apache.spark.sql.catalyst.util.DateTimeConstants._
 import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.types.{Decimal, DoubleExactNumeric, 
TimestampNTZType, TimestampType}
@@ -375,7 +375,7 @@ object DateTimeUtils extends SparkDateTimeUtils {
   /**
    * Returns day of week from String. Starting from Thursday, marked as 0.
    * (Because 1970-01-01 is Thursday).
-   * @throws IllegalArgumentException if the input is not a valid day of week.
+   * @throws SparkIllegalArgumentException if the input is not a valid day of 
week.
    */
   def getDayOfWeekFromString(string: UTF8String): Int = {
     val dowString = string.toString.toUpperCase(Locale.ROOT)
@@ -388,7 +388,9 @@ object DateTimeUtils extends SparkDateTimeUtils {
       case "FR" | "FRI" | "FRIDAY" => FRIDAY
       case "SA" | "SAT" | "SATURDAY" => SATURDAY
       case _ =>
-        throw new IllegalArgumentException(s"""Illegal input for day of week: 
$string""")
+        throw new SparkIllegalArgumentException(
+          errorClass = "_LEGACY_ERROR_TEMP_3209",
+          messageParameters = Map("string" -> string.toString))
     }
   }
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/IntervalUtils.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/IntervalUtils.scala
index e051cfc37f12..a016fd7d5881 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/IntervalUtils.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/IntervalUtils.scala
@@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit
 
 import scala.util.control.NonFatal
 
+import org.apache.spark.SparkIllegalArgumentException
 import org.apache.spark.sql.catalyst.expressions.Literal
 import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
 import org.apache.spark.sql.catalyst.util.DateTimeConstants._
@@ -105,11 +106,15 @@ object IntervalUtils extends SparkIntervalUtils {
       intervalStr: String,
       typeName: String,
       fallBackNotice: Option[String] = None) = {
-    throw new IllegalArgumentException(
-      s"Interval string does not match $intervalStr format of " +
-        s"${supportedFormat((startFiled, endField)).map(format => 
s"`$format`").mkString(", ")} " +
-        s"when cast to $typeName: ${input.toString}" +
-        s"${fallBackNotice.map(s => s", $s").getOrElse("")}")
+    throw new SparkIllegalArgumentException(
+      errorClass = "_LEGACY_ERROR_TEMP_3214",
+      messageParameters = Map(
+        "intervalStr" -> intervalStr,
+        "supportedFormat" -> supportedFormat((startFiled, endField))
+          .map(format => s"`$format`").mkString(", "),
+        "typeName" -> typeName,
+        "input" -> input.toString,
+        "fallBackNotice" -> fallBackNotice.map(s => s", $s").getOrElse("")))
   }
 
   val supportedFormat = Map(
@@ -198,8 +203,10 @@ object IntervalUtils extends SparkIntervalUtils {
       f
     } catch {
       case NonFatal(e) =>
-        throw new IllegalArgumentException(
-          s"Error parsing interval $interval string: ${e.getMessage}", e)
+        throw new SparkIllegalArgumentException(
+          errorClass = "_LEGACY_ERROR_TEMP_3213",
+          messageParameters = Map("interval" -> interval, "msg" -> 
e.getMessage),
+          cause = e)
     }
   }
 
@@ -499,8 +506,12 @@ object IntervalUtils extends SparkIntervalUtils {
           secondsFraction = 0
         case DT.SECOND =>
           // No-op
-        case _ => throw new IllegalArgumentException(s"Cannot support (" +
-          s"interval '$input' ${DT.fieldToString(from)} to 
${DT.fieldToString(to)}) expression")
+        case _ => throw new SparkIllegalArgumentException(
+          errorClass = "_LEGACY_ERROR_TEMP_3212",
+          messageParameters = Map(
+            "input" -> input,
+            "from" -> DT.fieldToString(from),
+            "to" -> DT.fieldToString(to)))
       }
       var micros = secondsFraction
       micros = Math.addExact(micros, Math.multiplyExact(hours, 
MICROS_PER_HOUR))
@@ -509,8 +520,10 @@ object IntervalUtils extends SparkIntervalUtils {
       new CalendarInterval(0, sign * days, sign * micros)
     } catch {
       case e: Exception =>
-        throw new IllegalArgumentException(
-          s"Error parsing interval day-time string: ${e.getMessage}", e)
+        throw new SparkIllegalArgumentException(
+          errorClass = "_LEGACY_ERROR_TEMP_3211",
+          messageParameters = Map("msg" -> e.getMessage),
+          cause = e)
     }
   }
 
@@ -546,9 +559,7 @@ object IntervalUtils extends SparkIntervalUtils {
       case Array(secondsStr, nanosStr) =>
         val seconds = parseSeconds(secondsStr)
         Math.addExact(seconds, parseNanos(nanosStr, seconds < 0))
-      case _ =>
-        throw new IllegalArgumentException(
-          "Interval string does not match second-nano format of ss.nnnnnnnnn")
+      case _ => throw new 
SparkIllegalArgumentException("_LEGACY_ERROR_TEMP_3210")
     }
   }
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlGenerator.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlGenerator.scala
index 53c8b4cf3422..87f0b50b9af2 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlGenerator.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlGenerator.scala
@@ -25,6 +25,7 @@ import scala.collection.Map
 import com.sun.xml.txw2.output.IndentingXMLStreamWriter
 import org.apache.hadoop.shaded.com.ctc.wstx.api.WstxOutputProperties
 
+import org.apache.spark.SparkIllegalArgumentException
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.util.{ArrayData, DateFormatter, 
DateTimeUtils, MapData, TimestampFormatter}
 import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT
@@ -219,8 +220,12 @@ class StaxXmlGenerator(
       }
 
     case (_, _) =>
-      throw new IllegalArgumentException(
-        s"Failed to convert value $v (class of ${v.getClass}) in type $dt to 
XML.")
+      throw new SparkIllegalArgumentException(
+        errorClass = "_LEGACY_ERROR_TEMP_3238",
+        messageParameters = scala.collection.immutable.Map(
+          "v" -> v.toString,
+          "class" -> v.getClass.toString,
+          "dt" -> dt.toString))
   }
 
   def writeMapData(mapType: MapType, map: MapData): Unit = {
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlParser.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlParser.scala
index 66ec636d1a65..544a761219cd 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlParser.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlParser.scala
@@ -33,7 +33,7 @@ import scala.xml.SAXException
 
 import org.apache.commons.lang3.exception.ExceptionUtils
 
-import org.apache.spark.SparkUpgradeException
+import org.apache.spark.{SparkIllegalArgumentException, SparkUpgradeException}
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.ExprUtils
@@ -232,8 +232,11 @@ class StaxXmlParser(
         StaxXmlParserUtils.skipNextEndElement(parser, startElementName, 
options)
         value
       case (e: XMLEvent, dt: DataType) =>
-        throw new IllegalArgumentException(
-          s"Failed to parse a value for data type $dt with event 
${e.toString}")
+        throw new SparkIllegalArgumentException(
+          errorClass = "_LEGACY_ERROR_TEMP_3240",
+          messageParameters = Map(
+            "dt" -> dt.toString,
+            "e" -> e.toString))
     }
   }
 
@@ -467,7 +470,9 @@ class StaxXmlParser(
         case _: TimestampNTZType => 
timestampNTZFormatter.parseWithoutTimeZone(datum, false)
         case _: DateType => parseXmlDate(datum, options)
         case _: StringType => UTF8String.fromString(datum)
-        case _ => throw new IllegalArgumentException(s"Unsupported type: 
${castType.typeName}")
+        case _ => throw new SparkIllegalArgumentException(
+          errorClass = "_LEGACY_ERROR_TEMP_3244",
+          messageParameters = Map("castType" -> "castType.typeName"))
       }
     }
   }
@@ -476,7 +481,9 @@ class StaxXmlParser(
     s.toLowerCase(Locale.ROOT) match {
       case "true" | "1" => true
       case "false" | "0" => false
-      case _ => throw new IllegalArgumentException(s"For input string: $s")
+      case _ => throw new SparkIllegalArgumentException(
+        errorClass = "_LEGACY_ERROR_TEMP_3245",
+        messageParameters = Map("s" -> s))
     }
   }
 
@@ -514,8 +521,9 @@ class StaxXmlParser(
         case ShortType => castTo(value, ShortType)
         case IntegerType => signSafeToInt(value)
         case dt: DecimalType => castTo(value, dt)
-        case _ => throw new IllegalArgumentException(
-          s"Failed to parse a value for data type $dataType.")
+        case _ => throw new SparkIllegalArgumentException(
+          errorClass = "_LEGACY_ERROR_TEMP_3246",
+          messageParameters = Map("dataType" -> dataType.toString))
       }
     }
   }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/XmlInferSchema.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/XmlInferSchema.scala
index b89c4c2397aa..be5a29d299a3 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/XmlInferSchema.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/XmlInferSchema.scala
@@ -30,6 +30,7 @@ import scala.util.control.Exception._
 import scala.util.control.NonFatal
 import scala.xml.SAXException
 
+import org.apache.spark.SparkIllegalArgumentException
 import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.analysis.TypeCoercion
@@ -209,7 +210,9 @@ class XmlInferSchema(options: XmlOptions, caseSensitive: 
Boolean)
           case _ => structType
         }
       case e: XMLEvent =>
-        throw new IllegalArgumentException(s"Failed to parse data with 
unexpected event $e")
+        throw new SparkIllegalArgumentException(
+          errorClass = "_LEGACY_ERROR_TEMP_3239",
+          messageParameters = Map("e" -> e.toString))
     }
   }
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
index 06887b0b9503..5485f5255b6e 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
@@ -22,6 +22,7 @@ import java.util.Collections
 
 import scala.jdk.CollectionConverters._
 
+import org.apache.spark.SparkIllegalArgumentException
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.CurrentUserContext
 import org.apache.spark.sql.catalyst.analysis.{AsOfTimestamp, AsOfVersion, 
NamedRelation, NoSuchDatabaseException, NoSuchFunctionException, 
NoSuchNamespaceException, NoSuchTableException, TimeTravelSpec}
@@ -163,7 +164,9 @@ private[sql] object CatalogV2Util {
                     addField(parentType, fieldWithComment, add.position(), 
tableProvider,
                       statementType, true)))
                 case _ =>
-                  throw new IllegalArgumentException(s"Not a struct: 
${names.init.last}")
+                  throw new SparkIllegalArgumentException(
+                    errorClass = "_LEGACY_ERROR_TEMP_3229",
+                    messageParameters = Map("name" -> names.init.last))
               })
           }
 
@@ -188,7 +191,9 @@ private[sql] object CatalogV2Util {
         case update: UpdateColumnPosition =>
           def updateFieldPos(struct: StructType, name: String): StructType = {
             val oldField = struct.fields.find(_.name == name).getOrElse {
-              throw new IllegalArgumentException("Field not found: " + name)
+              throw new SparkIllegalArgumentException(
+                errorClass = "_LEGACY_ERROR_TEMP_3230",
+                messageParameters = Map("name" -> name))
             }
             val withFieldRemoved = StructType(struct.fields.filter(_ != 
oldField))
             addField(withFieldRemoved, oldField, update.position(), 
tableProvider, statementType,
@@ -203,7 +208,9 @@ private[sql] object CatalogV2Util {
                 case parentType: StructType =>
                   Some(parent.copy(dataType = updateFieldPos(parentType, 
names.last)))
                 case _ =>
-                  throw new IllegalArgumentException(s"Not a struct: 
${names.init.last}")
+                  throw new SparkIllegalArgumentException(
+                    errorClass = "_LEGACY_ERROR_TEMP_3229",
+                    messageParameters = Map("name" -> names.init.last))
               })
           }
 
@@ -244,7 +251,9 @@ private[sql] object CatalogV2Util {
       val afterCol = position.asInstanceOf[After].column()
       val fieldIndex = schema.fields.indexWhere(_.name == afterCol)
       if (fieldIndex == -1) {
-        throw new IllegalArgumentException("AFTER column not found: " + 
afterCol)
+        throw new SparkIllegalArgumentException(
+          errorClass = "_LEGACY_ERROR_TEMP_3228",
+          messageParameters = Map("afterCol" -> afterCol))
       }
       val (before, after) = schema.fields.splitAt(fieldIndex + 1)
       StructType(before ++ (field +: after))
@@ -267,7 +276,9 @@ private[sql] object CatalogV2Util {
         // Currently only DROP COLUMN may pass down the IF EXISTS parameter
         return struct
       } else {
-        throw new IllegalArgumentException(s"Cannot find field: 
${fieldNames.head}")
+        throw new SparkIllegalArgumentException(
+          errorClass = "_LEGACY_ERROR_TEMP_3227",
+          messageParameters = Map("fieldName" -> fieldNames.head))
       }
     }
 
@@ -283,7 +294,7 @@ private[sql] object CatalogV2Util {
 
       case (Seq("key"), map @ MapType(keyType, _, _)) =>
         val updated = update(StructField("key", keyType, nullable = false))
-            .getOrElse(throw new IllegalArgumentException(s"Cannot delete map 
key"))
+            .getOrElse(throw new 
SparkIllegalArgumentException("_LEGACY_ERROR_TEMP_3226"))
         Some(field.copy(dataType = map.copy(keyType = updated.dataType)))
 
       case (Seq("key", names @ _*), map @ MapType(keyStruct: StructType, _, 
_)) =>
@@ -291,7 +302,7 @@ private[sql] object CatalogV2Util {
 
       case (Seq("value"), map @ MapType(_, mapValueType, isNullable)) =>
         val updated = update(StructField("value", mapValueType, nullable = 
isNullable))
-            .getOrElse(throw new IllegalArgumentException(s"Cannot delete map 
value"))
+            .getOrElse(throw new 
SparkIllegalArgumentException("_LEGACY_ERROR_TEMP_3225"))
         Some(field.copy(dataType = map.copy(
           valueType = updated.dataType,
           valueContainsNull = updated.nullable)))
@@ -302,7 +313,7 @@ private[sql] object CatalogV2Util {
 
       case (Seq("element"), array @ ArrayType(elementType, isNullable)) =>
         val updated = update(StructField("element", elementType, nullable = 
isNullable))
-            .getOrElse(throw new IllegalArgumentException(s"Cannot delete 
array element"))
+            .getOrElse(throw new 
SparkIllegalArgumentException("_LEGACY_ERROR_TEMP_3224"))
         Some(field.copy(dataType = array.copy(
           elementType = updated.dataType,
           containsNull = updated.nullable)))
@@ -313,8 +324,9 @@ private[sql] object CatalogV2Util {
 
       case (names, dataType) =>
         if (!ifExists) {
-          throw new IllegalArgumentException(
-            s"Cannot find field: ${names.head} in ${dataType.simpleString}")
+          throw new SparkIllegalArgumentException(
+            errorClass = "_LEGACY_ERROR_TEMP_3223",
+            messageParameters = Map("name" -> names.head, "dataType" -> 
dataType.simpleString))
         }
         None
     }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
index e341cfdd99f7..c1ad705a59f0 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
@@ -289,10 +289,6 @@ private[sql] object QueryExecutionErrors extends 
QueryErrorsBase with ExecutionE
         "ansiConfig" -> toSQLConf(SQLConf.ANSI_ENABLED.key)))
   }
 
-  def ansiIllegalArgumentError(e: IllegalArgumentException): 
IllegalArgumentException = {
-    ansiIllegalArgumentError(e.getMessage)
-  }
-
   def overflowInSumOfDecimalError(context: QueryContext): ArithmeticException 
= {
     arithmeticOverflowError("Overflow in sum of decimals", context = context)
   }
@@ -656,15 +652,11 @@ private[sql] object QueryExecutionErrors extends 
QueryErrorsBase with ExecutionE
   }
 
   def dataPathNotSpecifiedError(): SparkIllegalArgumentException = {
-    new SparkIllegalArgumentException(
-      errorClass = "_LEGACY_ERROR_TEMP_2047",
-      messageParameters = Map.empty)
+    new SparkIllegalArgumentException("_LEGACY_ERROR_TEMP_2047")
   }
 
   def createStreamingSourceNotSpecifySchemaError(): 
SparkIllegalArgumentException = {
-    new SparkIllegalArgumentException(
-      errorClass = "_LEGACY_ERROR_TEMP_2048",
-      messageParameters = Map.empty)
+    new SparkIllegalArgumentException("_LEGACY_ERROR_TEMP_2048")
   }
 
   def streamedOperatorUnsupportedByDataSourceError(
@@ -862,9 +854,7 @@ private[sql] object QueryExecutionErrors extends 
QueryErrorsBase with ExecutionE
   }
 
   def missingDatabaseLocationError(): SparkIllegalArgumentException = {
-    new SparkIllegalArgumentException(
-      errorClass = "_LEGACY_ERROR_TEMP_2068",
-      messageParameters = Map.empty)
+    new SparkIllegalArgumentException("_LEGACY_ERROR_TEMP_2068")
   }
 
   def cannotRemoveReservedPropertyError(property: String): 
SparkUnsupportedOperationException = {
@@ -977,9 +967,7 @@ private[sql] object QueryExecutionErrors extends 
QueryErrorsBase with ExecutionE
   }
 
   def nestedArraysUnsupportedError(): SparkIllegalArgumentException = {
-    new SparkIllegalArgumentException(
-      errorClass = "_LEGACY_ERROR_TEMP_2085",
-      messageParameters = Map.empty)
+    new SparkIllegalArgumentException("_LEGACY_ERROR_TEMP_2085")
   }
 
   def cannotTranslateNonNullValueForFieldError(pos: Int): 
SparkIllegalArgumentException = {
@@ -2719,4 +2707,12 @@ private[sql] object QueryExecutionErrors extends 
QueryErrorsBase with ExecutionE
         "codecName" -> codecName,
         "availableCodecs" -> availableCodecs))
   }
+
+  def partitionNumMismatchError(numFields: Int, schemaLen: Int): 
SparkIllegalArgumentException = {
+    new SparkIllegalArgumentException(
+      errorClass = "_LEGACY_ERROR_TEMP_3208",
+      messageParameters = Map(
+        "numFields" -> numFields.toString,
+        "schemaLen" -> schemaLen.toString))
+  }
 }
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala
index bf194a2288bb..562aac766fc3 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst
 
 import java.time.{Duration, Instant, LocalDate, LocalDateTime, Period}
 
-import org.apache.spark.SparkFunSuite
+import org.apache.spark.{SparkFunSuite, SparkIllegalArgumentException}
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.expressions.UnsafeArrayData
 import org.apache.spark.sql.catalyst.plans.SQLHelper
@@ -104,47 +104,67 @@ class CatalystTypeConvertersSuite extends SparkFunSuite 
with SQLHelper {
 
   test("converting a wrong value to the struct type") {
     val structType = new StructType().add("f1", IntegerType)
-    val exception = intercept[IllegalArgumentException] {
-      CatalystTypeConverters.createToCatalystConverter(structType)("test")
-    }
-    assert(exception.getMessage.contains("The value (test) of the type "
-      + "(java.lang.String) cannot be converted to struct<f1:int>"))
+    checkError(
+      exception = intercept[SparkIllegalArgumentException] {
+        CatalystTypeConverters.createToCatalystConverter(structType)("test")
+      },
+      errorClass = "_LEGACY_ERROR_TEMP_3219",
+      parameters = Map(
+        "other" -> "test",
+        "otherClass" -> "java.lang.String",
+        "dataType" -> "struct<f1:int>"))
   }
 
   test("converting a wrong value to the map type") {
     val mapType = MapType(StringType, IntegerType, false)
-    val exception = intercept[IllegalArgumentException] {
-      CatalystTypeConverters.createToCatalystConverter(mapType)("test")
-    }
-    assert(exception.getMessage.contains("The value (test) of the type "
-      + "(java.lang.String) cannot be converted to a map type with key "
-      + "type (string) and value type (int)"))
+    checkError(
+      exception = intercept[SparkIllegalArgumentException] {
+        CatalystTypeConverters.createToCatalystConverter(mapType)("test")
+      },
+      errorClass = "_LEGACY_ERROR_TEMP_3221",
+      parameters = Map(
+        "other" -> "test",
+        "otherClass" -> "java.lang.String",
+        "keyType" -> "string",
+        "valueType" -> "int"))
   }
 
   test("converting a wrong value to the array type") {
     val arrayType = ArrayType(IntegerType, true)
-    val exception = intercept[IllegalArgumentException] {
-      CatalystTypeConverters.createToCatalystConverter(arrayType)("test")
-    }
-    assert(exception.getMessage.contains("The value (test) of the type "
-      + "(java.lang.String) cannot be converted to an array of int"))
+    checkError(
+      exception = intercept[SparkIllegalArgumentException] {
+        CatalystTypeConverters.createToCatalystConverter(arrayType)("test")
+      },
+      errorClass = "_LEGACY_ERROR_TEMP_3220",
+      parameters = Map(
+        "other" -> "test",
+        "otherClass" -> "java.lang.String",
+        "elementType" -> "int"))
   }
 
   test("converting a wrong value to the decimal type") {
     val decimalType = DecimalType(10, 0)
-    val exception = intercept[IllegalArgumentException] {
-      CatalystTypeConverters.createToCatalystConverter(decimalType)("test")
-    }
-    assert(exception.getMessage.contains("The value (test) of the type "
-      + "(java.lang.String) cannot be converted to decimal(10,0)"))
+    checkError(
+      exception = intercept[SparkIllegalArgumentException] {
+        CatalystTypeConverters.createToCatalystConverter(decimalType)("test")
+      },
+      errorClass = "_LEGACY_ERROR_TEMP_3219",
+      parameters = Map(
+        "other" -> "test",
+        "otherClass" -> "java.lang.String",
+        "dataType" -> "decimal(10,0)"))
   }
 
   test("converting a wrong value to the string type") {
-    val exception = intercept[IllegalArgumentException] {
-      CatalystTypeConverters.createToCatalystConverter(StringType)(0.1)
-    }
-    assert(exception.getMessage.contains("The value (0.1) of the type "
-      + "(java.lang.Double) cannot be converted to the string type"))
+    checkError(
+      exception = intercept[SparkIllegalArgumentException] {
+        CatalystTypeConverters.createToCatalystConverter(StringType)(0.1)
+      },
+      errorClass = "_LEGACY_ERROR_TEMP_3219",
+      parameters = Map(
+        "other" -> "0.1",
+        "otherClass" -> "java.lang.Double",
+        "dataType" -> "STRING"))
   }
 
   test("SPARK-24571: convert Char to String") {
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVExprUtilsSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVExprUtilsSuite.scala
index fcb10c98243d..c1c826d16cf9 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVExprUtilsSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/CSVExprUtilsSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.csv
 
 import org.scalatest.prop.TableDrivenPropertyChecks._
 
-import org.apache.spark.SparkFunSuite
+import org.apache.spark.{SparkFunSuite, SparkIllegalArgumentException}
 
 class CSVExprUtilsSuite extends SparkFunSuite {
   test("Can parse escaped characters") {
@@ -34,31 +34,39 @@ class CSVExprUtilsSuite extends SparkFunSuite {
   }
 
   test("Does not accept delimiter larger than one character") {
-    val exception = intercept[IllegalArgumentException]{
-      CSVExprUtils.toChar("ab")
-    }
-    assert(exception.getMessage.contains("cannot be more than one character"))
+    checkError(
+      exception = intercept[SparkIllegalArgumentException]{
+        CSVExprUtils.toChar("ab")
+      },
+      errorClass = "_LEGACY_ERROR_TEMP_3237",
+      parameters = Map("str" -> "ab"))
   }
 
   test("Throws exception for unsupported escaped characters") {
-    val exception = intercept[IllegalArgumentException]{
-      CSVExprUtils.toChar("""\1""")
-    }
-    assert(exception.getMessage.contains("Unsupported special character for 
delimiter"))
+    checkError(
+      exception = intercept[SparkIllegalArgumentException]{
+        CSVExprUtils.toChar("""\1""")
+      },
+      errorClass = "_LEGACY_ERROR_TEMP_3236",
+      parameters = Map("str" -> """\1"""))
   }
 
   test("string with one backward slash is prohibited") {
-    val exception = intercept[IllegalArgumentException]{
-      CSVExprUtils.toChar("""\""")
-    }
-    assert(exception.getMessage.contains("Single backslash is prohibited"))
+    checkError(
+      exception = intercept[SparkIllegalArgumentException]{
+        CSVExprUtils.toChar("""\""")
+      },
+      errorClass = "_LEGACY_ERROR_TEMP_3248",
+      parameters = Map.empty)
   }
 
   test("output proper error message for empty string") {
-    val exception = intercept[IllegalArgumentException]{
-      CSVExprUtils.toChar("")
-    }
-    assert(exception.getMessage.contains("Delimiter cannot be empty string"))
+    checkError(
+      exception = intercept[SparkIllegalArgumentException]{
+        CSVExprUtils.toChar("")
+      },
+      errorClass = "_LEGACY_ERROR_TEMP_3247",
+      parameters = Map.empty)
   }
 
   val testCases = Table(
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/TimeWindowSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/TimeWindowSuite.scala
index 781c1c20783c..92cffe2152b4 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/TimeWindowSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/TimeWindowSuite.scala
@@ -17,11 +17,12 @@
 
 package org.apache.spark.sql.catalyst.expressions
 
+import scala.jdk.CollectionConverters._
 import scala.reflect.ClassTag
 
 import org.scalatest.PrivateMethodTester
 
-import org.apache.spark.{SparkException, SparkFunSuite}
+import org.apache.spark.{SparkException, SparkFunSuite, 
SparkIllegalArgumentException, SparkThrowable}
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.util.DateTimeConstants._
 import org.apache.spark.sql.internal.SQLConf
@@ -35,7 +36,8 @@ class TimeWindowSuite extends SparkFunSuite with 
ExpressionEvalHelper with Priva
     }
   }
 
-  private def checkErrorMessage[E <: Exception : ClassTag](msg: String, value: 
String): Unit = {
+  private def checkErrorMessage[E <: SparkThrowable : ClassTag](
+      parameters: Map[String, String], value: String): Unit = {
     val validDuration = "10 second"
     val validTime = "5 second"
     val e1 = intercept[E] {
@@ -48,25 +50,25 @@ class TimeWindowSuite extends SparkFunSuite with 
ExpressionEvalHelper with Priva
       TimeWindow(Literal(10L), validDuration, validDuration, value).startTime
     }
     Seq(e1, e2, e3).foreach { e =>
-      e.getMessage.contains(msg)
+      assert(e.getMessageParameters().asScala == parameters)
     }
   }
 
   test("blank intervals throw exception") {
-    for (blank <- Seq(null, " ", "\n", "\t")) {
+    for ((blank, i) <- Seq((null, "''"), (" ", "' '"), ("\n", "'\n'"), ("\t", 
"'\t'"))) {
       checkErrorMessage[AnalysisException](
-        "The window duration, slide duration and start time cannot be null or 
blank.", blank)
+        Map("intervalString" -> i), blank)
     }
   }
 
   test("invalid intervals throw exception") {
     checkErrorMessage[AnalysisException](
-      "did not correspond to a valid interval string.", "2 apples")
+      Map("intervalString" -> "'2 apples'"), "2 apples")
   }
 
   test("intervals greater than a month throws exception") {
-    checkErrorMessage[IllegalArgumentException](
-      "Intervals greater than or equal to a month is not supported (1 
month).", "1 month")
+    checkErrorMessage[SparkIllegalArgumentException](
+      Map("interval" -> "1 month"), "1 month")
   }
 
   test("interval strings work with and without 'interval' prefix and return 
microseconds") {
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolderSparkSubmitSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolderSparkSubmitSuite.scala
index fd28a1decdb8..3aeb0c882ac3 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolderSparkSubmitSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolderSparkSubmitSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.expressions.codegen
 import org.scalatest.{Assertions, BeforeAndAfterEach}
 import org.scalatest.matchers.must.Matchers
 
-import org.apache.spark.TestUtils
+import org.apache.spark.{SparkIllegalArgumentException, TestUtils}
 import org.apache.spark.deploy.SparkSubmitTestUtils
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow
 import org.apache.spark.unsafe.array.ByteArrayMethods
@@ -61,9 +61,10 @@ object BufferHolderSparkSubmitSuite extends Assertions {
 
     holder.reset()
 
-    assert(intercept[IllegalArgumentException] {
+    val e1 = intercept[SparkIllegalArgumentException] {
       holder.grow(-1)
-    }.getMessage.contains("because the size is negative"))
+    }
+    assert(e1.getErrorClass === "_LEGACY_ERROR_TEMP_3198")
 
     // while to reuse a buffer may happen, this test checks whether the buffer 
can be grown
     holder.grow(ARRAY_MAX / 2)
@@ -78,8 +79,9 @@ object BufferHolderSparkSubmitSuite extends Assertions {
     holder.grow(ARRAY_MAX - holder.totalSize())
     assert(unsafeRow.getSizeInBytes % 8 == 0)
 
-    assert(intercept[IllegalArgumentException] {
+    val e2 = intercept[SparkIllegalArgumentException] {
       holder.grow(ARRAY_MAX + 1 - holder.totalSize())
-    }.getMessage.contains("because the size after growing"))
+    }
+    assert(e2.getErrorClass === "_LEGACY_ERROR_TEMP_3199")
   }
 }
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolderSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolderSuite.scala
index 4e0f903a030a..79f03f23eb24 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolderSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolderSuite.scala
@@ -17,21 +17,29 @@
 
 package org.apache.spark.sql.catalyst.expressions.codegen
 
-import org.apache.spark.SparkFunSuite
+import org.apache.spark.{SparkFunSuite, SparkIllegalArgumentException, 
SparkUnsupportedOperationException}
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow
 
 class BufferHolderSuite extends SparkFunSuite {
 
   test("SPARK-16071 Check the size limit to avoid integer overflow") {
-    assert(intercept[UnsupportedOperationException] {
-      new BufferHolder(new UnsafeRow(Int.MaxValue / 8))
-    }.getMessage.contains("too many fields"))
+    checkError(
+      exception = intercept[SparkUnsupportedOperationException] {
+        new BufferHolder(new UnsafeRow(Int.MaxValue / 8))
+      },
+      errorClass = "_LEGACY_ERROR_TEMP_3130",
+      parameters = Map("numFields" -> "268435455"))
 
     val holder = new BufferHolder(new UnsafeRow(1000))
     holder.reset()
     holder.grow(1000)
-    assert(intercept[IllegalArgumentException] {
-      holder.grow(Integer.MAX_VALUE)
-    }.getMessage.contains("exceeds size limitation"))
+
+    checkError(
+      exception = intercept[SparkIllegalArgumentException] {
+        holder.grow(Integer.MAX_VALUE)
+      },
+      errorClass = "_LEGACY_ERROR_TEMP_3199",
+      parameters = Map("neededSize" -> "2147483647", "arrayMax" -> 
"2147483632")
+    )
   }
 }
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala
index c4f6e4d6ed0e..f68d485ac95f 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala
@@ -26,7 +26,7 @@ import java.util.concurrent.TimeUnit
 import org.scalatest.matchers.must.Matchers
 import org.scalatest.matchers.should.Matchers._
 
-import org.apache.spark.{SparkException, SparkFunSuite}
+import org.apache.spark.{SparkException, SparkFunSuite, 
SparkIllegalArgumentException}
 import org.apache.spark.sql.catalyst.plans.SQLHelper
 import org.apache.spark.sql.catalyst.util.DateTimeConstants._
 import org.apache.spark.sql.catalyst.util.DateTimeTestUtils._
@@ -539,7 +539,13 @@ class DateTimeUtilsSuite extends SparkFunSuite with 
Matchers with SQLHelper {
     assert(dateAddInterval(input, new CalendarInterval(36, 0, 0)) === 
days(2000, 2, 28))
     assert(dateAddInterval(input, new CalendarInterval(36, 47, 0)) === 
days(2000, 4, 15))
     assert(dateAddInterval(input, new CalendarInterval(-13, 0, 0)) === 
days(1996, 1, 28))
-    intercept[IllegalArgumentException](dateAddInterval(input, new 
CalendarInterval(36, 47, 1)))
+    checkError(
+      exception = intercept[SparkIllegalArgumentException](
+        dateAddInterval(input, new CalendarInterval(36, 47, 1))),
+      errorClass = "_LEGACY_ERROR_TEMP_2000",
+      parameters = Map(
+        "message" -> "Cannot add hours, minutes or seconds, milliseconds, 
microseconds to a date",
+        "ansiConfig" -> "\"spark.sql.ansi.enabled\""))
   }
 
   test("timestamp add interval") {
@@ -886,8 +892,18 @@ class DateTimeUtilsSuite extends SparkFunSuite with 
Matchers with SQLHelper {
   test("parsing day of week") {
     assert(getDayOfWeekFromString(UTF8String.fromString("THU")) == 0)
     assert(getDayOfWeekFromString(UTF8String.fromString("MONDAY")) == 4)
-    
intercept[IllegalArgumentException](getDayOfWeekFromString(UTF8String.fromString("xx")))
-    
intercept[IllegalArgumentException](getDayOfWeekFromString(UTF8String.fromString("\"quote")))
+    checkError(
+      exception = intercept[SparkIllegalArgumentException] {
+        getDayOfWeekFromString(UTF8String.fromString("xx"))
+      },
+      errorClass = "_LEGACY_ERROR_TEMP_3209",
+      parameters = Map("string" -> "xx"))
+    checkError(
+      exception = intercept[SparkIllegalArgumentException] {
+        getDayOfWeekFromString(UTF8String.fromString("\"quote"))
+      },
+      errorClass = "_LEGACY_ERROR_TEMP_3209",
+      parameters = Map("string" -> "\"quote"))
   }
 
   test("SPARK-34761: timestamp add day-time interval") {
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/util/CaseInsensitiveStringMapSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/CaseInsensitiveStringMapSuite.scala
index a097e62f033b..dfed284bc2b9 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/util/CaseInsensitiveStringMapSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/CaseInsensitiveStringMapSuite.scala
@@ -21,7 +21,7 @@ import java.util
 
 import scala.jdk.CollectionConverters._
 
-import org.apache.spark.SparkFunSuite
+import org.apache.spark.{SparkFunSuite, SparkIllegalArgumentException}
 
 class CaseInsensitiveStringMapSuite extends SparkFunSuite {
 
@@ -63,9 +63,12 @@ class CaseInsensitiveStringMapSuite extends SparkFunSuite {
     assert(options.getBoolean("isBar", true))
     assert(!options.getBoolean("isBar", false))
 
-    intercept[IllegalArgumentException] {
-      options.getBoolean("FOO", true)
-    }
+    checkError(
+      exception = intercept[SparkIllegalArgumentException] {
+        options.getBoolean("FOO", true)
+      },
+      errorClass = "_LEGACY_ERROR_TEMP_3206",
+      parameters = Map("value" -> "bar"))
   }
 
   test("getLong") {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AlterTableExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AlterTableExec.scala
index cc1c73b020d6..dd52ca716396 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AlterTableExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AlterTableExec.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
+import org.apache.spark.SparkThrowable
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog, 
TableChange}
@@ -36,7 +37,7 @@ case class AlterTableExec(
     try {
       catalog.alterTable(ident, changes: _*)
     } catch {
-      case e: IllegalArgumentException =>
+      case e: IllegalArgumentException if !e.isInstanceOf[SparkThrowable] =>
         throw QueryExecutionErrors.unsupportedTableChangeError(e)
     }
 
diff --git a/sql/core/src/test/resources/sql-tests/results/ansi/date.sql.out 
b/sql/core/src/test/resources/sql-tests/results/ansi/date.sql.out
index a30030bbaca9..507df161e8b4 100644
--- a/sql/core/src/test/resources/sql-tests/results/ansi/date.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/ansi/date.sql.out
@@ -268,10 +268,9 @@ struct<>
 -- !query output
 org.apache.spark.SparkIllegalArgumentException
 {
-  "errorClass" : "_LEGACY_ERROR_TEMP_2000",
+  "errorClass" : "_LEGACY_ERROR_TEMP_3209",
   "messageParameters" : {
-    "ansiConfig" : "\"spark.sql.ansi.enabled\"",
-    "message" : "Illegal input for day of week: xx"
+    "string" : "xx"
   }
 }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/expressions/ExpressionInfoSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/expressions/ExpressionInfoSuite.scala
index b28a72f7c201..5168480f84b0 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/expressions/ExpressionInfoSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/expressions/ExpressionInfoSuite.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.expressions
 
-import org.apache.spark.SparkFunSuite
+import org.apache.spark.{SparkFunSuite, SparkIllegalArgumentException}
 import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow}
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.execution.HiveResult.hiveResultString
@@ -55,50 +55,74 @@ class ExpressionInfoSuite extends SparkFunSuite with 
SharedSparkSession {
       assert(info.getGroup === groupName)
     }
 
-    val errMsg = intercept[IllegalArgumentException] {
-      val invalidGroupName = "invalid_group_funcs"
-      new ExpressionInfo(
-        "testClass", null, "testName", null, "", "", "", invalidGroupName, "", 
"", "")
-    }.getMessage
-    assert(errMsg.contains("'group' is malformed in the expression 
[testName]."))
+    val validGroups = Seq(
+      "agg_funcs", "array_funcs", "binary_funcs", "bitwise_funcs", 
"collection_funcs",
+      "predicate_funcs", "conditional_funcs", "conversion_funcs", "csv_funcs", 
"datetime_funcs",
+      "generator_funcs", "hash_funcs", "json_funcs", "lambda_funcs", 
"map_funcs", "math_funcs",
+      "misc_funcs", "string_funcs", "struct_funcs", "window_funcs", 
"xml_funcs", "table_funcs",
+      "url_funcs", "variant_funcs").sorted
+    val invalidGroupName = "invalid_group_funcs"
+    checkError(
+      exception = intercept[SparkIllegalArgumentException] {
+        new ExpressionInfo(
+          "testClass", null, "testName", null, "", "", "", invalidGroupName, 
"", "", "")
+      },
+      errorClass = "_LEGACY_ERROR_TEMP_3202",
+      parameters = Map(
+        "exprName" -> "testName",
+        "group" -> invalidGroupName,
+        "validGroups" -> validGroups.mkString("[", ", ", "]")))
   }
 
   test("source in ExpressionInfo") {
     val info = 
spark.sessionState.catalog.lookupFunctionInfo(FunctionIdentifier("sum"))
     assert(info.getSource === "built-in")
 
-    Seq("python_udf", "java_udf", "scala_udf", "built-in", "hive").foreach { 
source =>
+    val validSources = Seq("built-in", "hive", "python_udf", "scala_udf", 
"java_udf", "python_udtf")
+    validSources.foreach { source =>
       val info = new ExpressionInfo(
         "testClass", null, "testName", null, "", "", "", "", "", "", source)
       assert(info.getSource === source)
     }
-    val errMsg = intercept[IllegalArgumentException] {
-      val invalidSource = "invalid_source"
-      new ExpressionInfo(
-        "testClass", null, "testName", null, "", "", "", "", "", "", 
invalidSource)
-    }.getMessage
-    assert(errMsg.contains("'source' is malformed in the expression 
[testName]."))
+    val invalidSource = "invalid_source"
+    checkError(
+      exception = intercept[SparkIllegalArgumentException] {
+        new ExpressionInfo(
+          "testClass", null, "testName", null, "", "", "", "", "", "", 
invalidSource)
+      },
+      errorClass = "_LEGACY_ERROR_TEMP_3203",
+      parameters = Map(
+        "exprName" -> "testName",
+        "source" -> invalidSource,
+        "validSources" -> validSources.sorted.mkString("[", ", ", "]")))
   }
 
   test("error handling in ExpressionInfo") {
-    val errMsg1 = intercept[IllegalArgumentException] {
-      val invalidNote = "  invalid note"
-      new ExpressionInfo("testClass", null, "testName", null, "", "", 
invalidNote, "", "", "", "")
-    }.getMessage
-    assert(errMsg1.contains("'note' is malformed in the expression 
[testName]."))
+    val invalidNote = "  invalid note"
+    checkError(
+      exception = intercept[SparkIllegalArgumentException] {
+        new ExpressionInfo("testClass", null, "testName", null, "", "", 
invalidNote, "", "", "", "")
+      },
+      errorClass = "_LEGACY_ERROR_TEMP_3201",
+      parameters = Map("exprName" -> "testName", "note" -> invalidNote))
 
-    val errMsg2 = intercept[IllegalArgumentException] {
-      val invalidSince = "-3.0.0"
-      new ExpressionInfo("testClass", null, "testName", null, "", "", "", "", 
invalidSince, "", "")
-    }.getMessage
-    assert(errMsg2.contains("'since' is malformed in the expression 
[testName]."))
+    val invalidSince = "-3.0.0"
+    checkError(
+      exception = intercept[SparkIllegalArgumentException] {
+        new ExpressionInfo(
+          "testClass", null, "testName", null, "", "", "", "", invalidSince, 
"", "")
+      },
+      errorClass = "_LEGACY_ERROR_TEMP_3204",
+      parameters = Map("since" -> invalidSince, "exprName" -> "testName"))
 
-    val errMsg3 = intercept[IllegalArgumentException] {
-      val invalidDeprecated = "  invalid deprecated"
-      new ExpressionInfo(
-        "testClass", null, "testName", null, "", "", "", "", "", 
invalidDeprecated, "")
-    }.getMessage
-    assert(errMsg3.contains("'deprecated' is malformed in the expression 
[testName]."))
+    val invalidDeprecated = "  invalid deprecated"
+    checkError(
+      exception = intercept[SparkIllegalArgumentException] {
+        new ExpressionInfo(
+          "testClass", null, "testName", null, "", "", "", "", "", 
invalidDeprecated, "")
+      },
+      errorClass = "_LEGACY_ERROR_TEMP_3205",
+      parameters = Map("exprName" -> "testName", "deprecated" -> 
invalidDeprecated))
   }
 
   test("using _FUNC_ instead of function names in examples") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to