This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new ee95ec35b4f [SPARK-40540][SQL] Migrate compilation errors onto error 
classes: _LEGACY_ERROR_TEMP_1100-1199
ee95ec35b4f is described below

commit ee95ec35b4f711fada4b62bc27281252850bb475
Author: Max Gekk <max.g...@gmail.com>
AuthorDate: Wed Sep 28 08:52:44 2022 +0300

    [SPARK-40540][SQL] Migrate compilation errors onto error classes: 
_LEGACY_ERROR_TEMP_1100-1199
    
    ### What changes were proposed in this pull request?
    In the PR, I propose to migrate 100 compilation errors onto temporary error 
classes with the prefix `_LEGACY_ERROR_TEMP_11xx`. The error message will not 
include the error classes, so, in this way we will preserve the existing 
behaviour.
    
    ### Why are the changes needed?
    The migration on temporary error classes allows to gather statistics about 
errors and detect most popular error classes. After that we could prioritise 
the work on migration.
    
    The new error class name prefix `_LEGACY_ERROR_TEMP_` proposed here kind of 
marks the error as developer-facing, not user-facing. Developers can still get 
the error class programmatically via the `SparkThrowable` interface, so that 
they can build error infra with it. End users won't see the error class in the 
message. This allows us to do the error migration very quickly, and we can 
refine the error classes and mark them as user-facing later (naming them 
properly, adding tests, etc.).
    
    ### Does this PR introduce _any_ user-facing change?
    No. The error messages should be almost the same by default.
    
    ### How was this patch tested?
    By running the modified test suites:
    ```
    $ build/sbt "core/testOnly *SparkThrowableSuite"
    $ build/sbt "sql/testOnly org.apache.spark.sql.SQLQueryTestSuite"
    $ build/sbt -Phive -Phive-thriftserver "test:testOnly 
*MetastoreDataSourcesSuite"
    $ build/sbt -Phive -Phive-thriftserver "test:testOnly 
*BucketedWriteWithHiveSupportSuite"
    $ build/sbt "test:testOnly org.apache.spark.sql.jdbc.JDBCWriteSuite"
    $ build/sbt "test:testOnly *.PersistedViewTestSuite"
    ```
    
    Closes #38000 from MaxGekk/legacy-error-temp-compliation-1100.
    
    Authored-by: Max Gekk <max.g...@gmail.com>
    Signed-off-by: Max Gekk <max.g...@gmail.com>
---
 R/pkg/tests/fulltests/test_sparkSQL.R              |  10 +-
 core/src/main/resources/error/error-classes.json   | 514 ++++++++++++++++++
 .../catalyst/analysis/AlreadyExistException.scala  |  12 +-
 .../CannotReplaceMissingTableException.scala       |   5 +-
 .../catalyst/analysis/NoSuchItemException.scala    |   6 +-
 .../spark/sql/errors/QueryCompilationErrors.scala  | 594 +++++++++++++--------
 .../results/ansi/string-functions.sql.out          |  10 +-
 .../results/ceil-floor-with-scale-param.sql.out    |  64 ++-
 .../resources/sql-tests/results/extract.sql.out    |  92 +++-
 .../sql-tests/results/postgreSQL/int8.sql.out      |  22 +-
 .../sql-tests/results/postgreSQL/numeric.sql.out   |  10 +-
 .../sql-tests/results/show-tables.sql.out          |   7 +-
 .../sql-tests/results/string-functions.sql.out     |  10 +-
 .../results/table-valued-functions.sql.out         |  88 ++-
 .../org/apache/spark/sql/jdbc/JDBCWriteSuite.scala |  42 +-
 .../spark/sql/sources/BucketedWriteSuite.scala     |  30 +-
 .../spark/sql/hive/MetastoreDataSourcesSuite.scala |  41 +-
 17 files changed, 1249 insertions(+), 308 deletions(-)

diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R 
b/R/pkg/tests/fulltests/test_sparkSQL.R
index 68fa6aac8e7..2d55597b55f 100644
--- a/R/pkg/tests/fulltests/test_sparkSQL.R
+++ b/R/pkg/tests/fulltests/test_sparkSQL.R
@@ -622,7 +622,7 @@ test_that("read/write json files", {
 
     # Test errorifexists
     expect_error(write.df(df, jsonPath2, "json", mode = "errorifexists"),
-                 "analysis error - path file:.*already exists")
+                 "analysis error - Path file:.*already exists")
 
     # Test write.json
     jsonPath3 <- tempfile(pattern = "jsonPath3", fileext = ".json")
@@ -3963,13 +3963,13 @@ test_that("Call DataFrameWriter.save() API in Java 
without path and check argume
   expect_error(write.df(df, source = "csv"),
               "Error in save : illegal argument - Expected exactly one path to 
be specified")
   expect_error(write.json(df, jsonPath),
-              "Error in json : analysis error - path file:.*already exists")
+              "Error in json : analysis error - Path file:.*already exists")
   expect_error(write.text(df, jsonPath),
-              "Error in text : analysis error - path file:.*already exists")
+              "Error in text : analysis error - Path file:.*already exists")
   expect_error(write.orc(df, jsonPath),
-              "Error in orc : analysis error - path file:.*already exists")
+              "Error in orc : analysis error - Path file:.*already exists")
   expect_error(write.parquet(df, jsonPath),
-              "Error in parquet : analysis error - path file:.*already exists")
+              "Error in parquet : analysis error - Path file:.*already exists")
   expect_error(write.parquet(df, jsonPath, mode = 123), "mode should be 
character or omitted.")
 
   # Arguments checking in R side.
diff --git a/core/src/main/resources/error/error-classes.json 
b/core/src/main/resources/error/error-classes.json
index 9e909bcb6c9..48e90cc617d 100644
--- a/core/src/main/resources/error/error-classes.json
+++ b/core/src/main/resources/error/error-classes.json
@@ -1720,5 +1720,519 @@
     "message" : [
       "<funcName>() doesn't support the <mode> mode. Acceptable modes are 
<permissiveMode> and <failFastMode>."
     ]
+  },
+  "_LEGACY_ERROR_TEMP_1100" : {
+    "message" : [
+      "The '<argName>' parameter of function '<funcName>' needs to be a 
<requiredType> literal."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1101" : {
+    "message" : [
+      "Invalid value for the '<argName>' parameter of function '<funcName>': 
<invalidValue>.<endingMsg>."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1102" : {
+    "message" : [
+      "Literals of type '<field>' are currently not supported for the 
<srcDataType> type."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1103" : {
+    "message" : [
+      "Unsupported component type <clz> in arrays."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1104" : {
+    "message" : [
+      "The second argument should be a double literal."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1105" : {
+    "message" : [
+      "Field name should be String Literal, but it's <extraction>."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1106" : {
+    "message" : [
+      "Can't extract value from <child>: need struct type but got <other>."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1107" : {
+    "message" : [
+      "Table <table> declares <batchWrite> capability but <v2WriteClassName> 
is not an instance of <v1WriteClassName>."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1108" : {
+    "message" : [
+      "Delete by condition with subquery is not supported: <condition>."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1109" : {
+    "message" : [
+      "Exec update failed: cannot translate expression to source filter: <f>."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1110" : {
+    "message" : [
+      "Cannot delete from table <table> where <filters>."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1111" : {
+    "message" : [
+      "DESCRIBE does not support partition for v2 tables."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1112" : {
+    "message" : [
+      "Table <table> cannot be replaced as it did not exist. Use CREATE OR 
REPLACE TABLE to create the table."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1113" : {
+    "message" : [
+      "Table <table> does not support <cmd>."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1114" : {
+    "message" : [
+      "The streaming sources in a query do not have a common supported 
execution mode.",
+      "Sources support micro-batch: <microBatchSources>",
+      "Sources support continuous: <continuousSources>"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1115" : {
+    "message" : [
+      "<msg>."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1116" : {
+    "message" : [
+      "<msg>."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1117" : {
+    "message" : [
+      "<sessionCatalog> requires a single-part namespace, but got <ns>."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1118" : {
+    "message" : [
+      "<msg>."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1119" : {
+    "message" : [
+      "<cmd> is not supported in JDBC catalog."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1120" : {
+    "message" : [
+      "Unsupported NamespaceChange <changes> in JDBC catalog."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1121" : {
+    "message" : [
+      "Table does not support <cmd>: <table>."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1122" : {
+    "message" : [
+      "Table <table> is not a row-level operation table."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1123" : {
+    "message" : [
+      "Cannot rename a table with ALTER VIEW. Please use ALTER TABLE instead."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1124" : {
+    "message" : [
+      "<cmd> is not supported for v2 tables."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1125" : {
+    "message" : [
+      "Database from v1 session catalog is not specified."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1126" : {
+    "message" : [
+      "Nested databases are not supported by v1 session catalog: <catalog>."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1127" : {
+    "message" : [
+      "Invalid partitionExprs specified: <sortOrders> For range partitioning 
use REPARTITION_BY_RANGE instead."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1128" : {
+    "message" : [
+      "Failed to resolve the schema for <format> for the partition column: 
<partitionColumn>. It must be specified manually."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1129" : {
+    "message" : [
+      "Unable to infer schema for <format>. It must be specified manually."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1130" : {
+    "message" : [
+      "Path does not exist: <path>."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1131" : {
+    "message" : [
+      "Data source <className> does not support <outputMode> output mode."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1132" : {
+    "message" : [
+      "A schema needs to be specified when using <className>."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1133" : {
+    "message" : [
+      "The user-specified schema doesn't match the actual schema:",
+      "user-specified: <schema>, actual: <actualSchema>. If you're using",
+      "DataFrameReader.schema API or creating a table, please do not specify 
the schema.",
+      "Or if you're scanning an existed table, please drop it and re-create 
it."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1134" : {
+    "message" : [
+      "Unable to infer schema for <format> at <fileCatalog>. It must be 
specified manually."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1135" : {
+    "message" : [
+      "<className> is not a valid Spark SQL Data Source."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1136" : {
+    "message" : [
+      "Cannot save interval data type into external storage."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1137" : {
+    "message" : [
+      "Unable to resolve <name> given [<outputStr>]."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1138" : {
+    "message" : [
+      "Hive built-in ORC data source must be used with Hive support enabled. 
Please use the native ORC data source by setting 'spark.sql.orc.impl' to 
'native'."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1139" : {
+    "message" : [
+      "Failed to find data source: <provider>. Avro is built-in but external 
data source module since Spark 2.4. Please deploy the application as per the 
deployment section of Apache Avro Data Source Guide."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1140" : {
+    "message" : [
+      "Failed to find data source: <provider>. Please deploy the application 
as per the deployment section of Structured Streaming + Kafka Integration 
Guide."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1141" : {
+    "message" : [
+      "Multiple sources found for <provider> (<sourceNames>), please specify 
the fully qualified class name."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1142" : {
+    "message" : [
+      "Datasource does not support writing empty or nested empty schemas. 
Please make sure the data schema has at least one or more column(s)."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1143" : {
+    "message" : [
+      "The data to be inserted needs to have the same number of columns as the 
target table: target table has <targetSize> column(s) but the inserted data has 
<actualSize> column(s), which contain <staticPartitionsSize> partition 
column(s) having assigned constant values."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1144" : {
+    "message" : [
+      "The data to be inserted needs to have the same number of partition 
columns as the target table: target table has <targetSize> partition column(s) 
but the inserted data has <providedPartitionsSize> partition columns specified."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1145" : {
+    "message" : [
+      "<partKey> is not a partition column. Partition columns are 
<partitionColumns>."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1146" : {
+    "message" : [
+      "Partition column <partColumn> have multiple values specified, <values>. 
Please only specify a single value."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1147" : {
+    "message" : [
+      "The ordering of partition columns is <partColumns>. All partition 
columns having constant values need to appear before other partition columns 
that do not have an assigned constant value."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1148" : {
+    "message" : [
+      "Can only write data to relations with a single path."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1149" : {
+    "message" : [
+      "Fail to rebuild expression: missing key <filter> in 
`translatedFilterToExpr`."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1150" : {
+    "message" : [
+      "Column `<field>` has a data type of <fieldType>, which is not supported 
by <format>."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1151" : {
+    "message" : [
+      "Fail to resolve data source for the table <table> since the table serde 
property has the duplicated key <key> with extra options specified for this 
scan operation. To fix this, you can rollback to the legacy behavior of 
ignoring the extra options by setting the config <config> to `false`, or 
address the conflicts of the same config."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1152" : {
+    "message" : [
+      "Path <outputPath> already exists."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1153" : {
+    "message" : [
+      "Cannot use <field> for partition column."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1154" : {
+    "message" : [
+      "Cannot use all columns for partition columns."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1155" : {
+    "message" : [
+      "Partition column `<col>` not found in schema <schemaCatalog>."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1156" : {
+    "message" : [
+      "Column <colName> not found in schema <tableSchema>."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1157" : {
+    "message" : [
+      "Unsupported data source type for direct query on files: <className>."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1158" : {
+    "message" : [
+      "Saving data into a view is not allowed."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1159" : {
+    "message" : [
+      "The format of the existing table <tableName> is `<existingProvider>`. 
It doesn't match the specified format `<specifiedProvider>`."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1160" : {
+    "message" : [
+      "The location of the existing table <identifier> is 
`<existingTableLoc>`. It doesn't match the specified location `<tableDescLoc>`."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1161" : {
+    "message" : [
+      "The column number of the existing table <tableName> 
(<existingTableSchema>) doesn't match the data schema (<querySchema>)."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1162" : {
+    "message" : [
+      "Cannot resolve '<col>' given input columns: [<inputColumns>]."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1163" : {
+    "message" : [
+      "Specified partitioning does not match that of the existing table 
<tableName>.",
+      "Specified partition columns: [<specifiedPartCols>]",
+      "Existing partition columns: [<existingPartCols>]"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1164" : {
+    "message" : [
+      "Specified bucketing does not match that of the existing table 
<tableName>.",
+      "Specified bucketing: <specifiedBucketString>",
+      "Existing bucketing: <existingBucketString>"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1165" : {
+    "message" : [
+      "It is not allowed to specify partitioning when the table schema is not 
defined."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1166" : {
+    "message" : [
+      "Bucketing column '<bucketCol>' should not be part of partition columns 
'<normalizedPartCols>'."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1167" : {
+    "message" : [
+      "Bucket sorting column '<sortCol>' should not be part of partition 
columns '<normalizedPartCols>'."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1168" : {
+    "message" : [
+      "<tableName> requires that the data to be inserted have the same number 
of columns as the target table: target table has <targetColumns> column(s) but 
the inserted data has <insertedColumns> column(s), including <staticPartCols> 
partition column(s) having constant value(s)."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1169" : {
+    "message" : [
+      "Requested partitioning does not match the table $tableName:",
+      "Requested partitions: <normalizedPartSpec>",
+      "Table partitions: <partColNames>"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1170" : {
+    "message" : [
+      "Hive support is required to <detail>."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1171" : {
+    "message" : [
+      "createTableColumnTypes option column <col> not found in schema 
<schema>."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1172" : {
+    "message" : [
+      "Parquet type not yet supported: <parquetType>."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1173" : {
+    "message" : [
+      "Illegal Parquet type: <parquetType>."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1174" : {
+    "message" : [
+      "Unrecognized Parquet type: <field>."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1175" : {
+    "message" : [
+      "Unsupported data type <dataType>."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1176" : {
+    "message" : [
+      "The SQL query of view <viewName> has an incompatible schema change and 
column <colName> cannot be resolved. Expected <expectedNum> columns named 
<colName> but got <actualCols>.",
+      "Please try to re-create the view by running: <viewDDL>"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1177" : {
+    "message" : [
+      "The SQL query of view <viewName> has an incompatible schema change and 
column <colName> cannot be resolved. Expected <expectedNum> columns named 
<colName> but got <actualCols>."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1178" : {
+    "message" : [
+      "The number of partitions can't be specified with unspecified 
distribution. Invalid writer requirements detected."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1179" : {
+    "message" : [
+      "Table-valued function $name with alternatives: <usage>",
+      "cannot be applied to (<arguments>): <details>."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1180" : {
+    "message" : [
+      "Incompatible input data type.",
+      "Expected: <expectedDataType>; Found: <foundDataType>."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1181" : {
+    "message" : [
+      "Stream-stream join without equality predicate is not supported."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1182" : {
+    "message" : [
+      "Column <ambiguousAttrs> are ambiguous. It's probably because you joined 
several Datasets together, and some of these Datasets are the same. This column 
points to one of the Datasets but Spark is unable to figure out which one. 
Please alias the Datasets with different names via `Dataset.as` before joining 
them, and specify the column using qualified name, e.g. 
`df.as(\"a\").join(df.as(\"b\"), $\"a.id\" > $\"b.id\")`. You can also set 
<config> to false to disable this check."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1183" : {
+    "message" : [
+      "Cannot use interval type in the table schema."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1184" : {
+    "message" : [
+      "Catalog <plugin> does not support <ability>."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1185" : {
+    "message" : [
+      "<quoted> is not a valid <identifier> as it has more than 2 name parts."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1186" : {
+    "message" : [
+      "Multi-part identifier cannot be empty."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1187" : {
+    "message" : [
+      "Hive data source can only be used with tables, you can not <operation> 
files of Hive data source directly."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1188" : {
+    "message" : [
+      "There is a 'path' option set and <method>() is called with a path 
parameter. Either remove the path option, or call <method>() without the 
parameter. To ignore this check, set '<config>' to 'true'."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1189" : {
+    "message" : [
+      "User specified schema not supported with `<operation>`."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1190" : {
+    "message" : [
+      "Temporary view <viewName> doesn't support streaming write."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1191" : {
+    "message" : [
+      "Streaming into views <viewName> is not supported."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1192" : {
+    "message" : [
+      "The input source(<source>) is different from the table <tableName>'s 
data source provider(<provider>)."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1193" : {
+    "message" : [
+      "Table <tableName> doesn't support streaming write - <t>."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1194" : {
+    "message" : [
+      "queryName must be specified for memory sink."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1195" : {
+    "message" : [
+      "'<source>' is not supported with continuous trigger."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1196" : {
+    "message" : [
+      "<columnType> column <columnName> not found in existing columns 
(<validColumnNames>)."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1197" : {
+    "message" : [
+      "'<operation>' does not support partitioning."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1198" : {
+    "message" : [
+      "Function '<unbound>' cannot process input: (<arguments>): <unsupported>"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1199" : {
+    "message" : [
+      "Invalid bound function '<bound>: there are <argsLen> arguments but 
<inputTypesLen> parameters returned from 'inputTypes()'."
+    ]
   }
 }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlreadyExistException.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlreadyExistException.scala
index fb177251a73..04699dc5696 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlreadyExistException.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlreadyExistException.scala
@@ -31,14 +31,22 @@ import org.apache.spark.sql.types.StructType
 class DatabaseAlreadyExistsException(db: String)
   extends NamespaceAlreadyExistsException(s"Database '$db' already exists")
 
-class NamespaceAlreadyExistsException(message: String) extends 
AnalysisException(message) {
+class NamespaceAlreadyExistsException(message: String)
+  extends AnalysisException(
+    message,
+    errorClass = Some("_LEGACY_ERROR_TEMP_1118"),
+    messageParameters = Map("msg" -> message)) {
   def this(namespace: Array[String]) = {
     this(s"Namespace '${namespace.quoted}' already exists")
   }
 }
 
 class TableAlreadyExistsException(message: String, cause: Option[Throwable] = 
None)
-  extends AnalysisException(message, cause = cause) {
+  extends AnalysisException(
+    message,
+    errorClass = Some("_LEGACY_ERROR_TEMP_1116"),
+    messageParameters = Map("msg" -> message),
+    cause = cause) {
   def this(db: String, table: String) = {
     this(s"Table or view '$table' already exists in database '$db'")
   }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CannotReplaceMissingTableException.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CannotReplaceMissingTableException.scala
index 311f7220345..7a52bc07d53 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CannotReplaceMissingTableException.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CannotReplaceMissingTableException.scala
@@ -25,5 +25,6 @@ class CannotReplaceMissingTableException(
     tableIdentifier: Identifier,
     cause: Option[Throwable] = None)
   extends AnalysisException(
-    s"Table $tableIdentifier cannot be replaced as it did not exist." +
-      s" Use CREATE OR REPLACE TABLE to create the table.", cause = cause)
+    errorClass = "_LEGACY_ERROR_TEMP_1112",
+    messageParameters = Map("table" -> tableIdentifier.toString),
+    cause = cause)
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala
index 5c2c6d918a2..bf990afad6d 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala
@@ -45,7 +45,11 @@ case class NoSuchNamespaceException(
 case class NoSuchTableException(
     override val message: String,
     override val cause: Option[Throwable] = None)
-  extends AnalysisException(message, cause = cause) {
+  extends AnalysisException(
+    message,
+    errorClass = Some("_LEGACY_ERROR_TEMP_1115"),
+    messageParameters = Map("msg" -> message),
+    cause = cause) {
 
   def this(db: String, table: String) = {
     this(s"Table or view '$table' not found in database '$db'")
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index cf7ce045bff..f1da38f1734 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -21,6 +21,7 @@ import scala.collection.mutable
 
 import org.apache.hadoop.fs.Path
 
+import org.apache.spark.SparkThrowableHelper
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.{FunctionIdentifier, QualifiedTableName, 
TableIdentifier}
 import 
org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, 
NamespaceAlreadyExistsException, NoSuchFunctionException, 
NoSuchNamespaceException, NoSuchPartitionException, NoSuchTableException, 
ResolvedTable, Star, TableAlreadyExistsException, UnresolvedRegex}
@@ -1055,7 +1056,11 @@ private[sql] object QueryCompilationErrors extends 
QueryErrorsBase {
   def requireLiteralParameter(
       funcName: String, argName: String, requiredType: String): Throwable = {
     new AnalysisException(
-      s"The '$argName' parameter of function '$funcName' needs to be a 
$requiredType literal.")
+      errorClass = "_LEGACY_ERROR_TEMP_1100",
+      messageParameters = Map(
+        "argName" -> argName,
+        "funcName" -> funcName,
+        "requiredType" -> requiredType))
   }
 
   def invalidStringLiteralParameter(
@@ -1064,32 +1069,49 @@ private[sql] object QueryCompilationErrors extends 
QueryErrorsBase {
       invalidValue: String,
       allowedValues: Option[String] = None): Throwable = {
     val endingMsg = allowedValues.map(" " + _).getOrElse("")
-    new AnalysisException(s"Invalid value for the '$argName' parameter of 
function '$funcName': " +
-      s"$invalidValue.$endingMsg")
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1101",
+      messageParameters = Map(
+        "argName" -> argName,
+        "funcName" -> funcName,
+        "invalidValue" -> invalidValue,
+        "endingMsg" -> endingMsg))
   }
 
   def literalTypeUnsupportedForSourceTypeError(field: String, source: 
Expression): Throwable = {
-    new AnalysisException(s"Literals of type '$field' are currently not 
supported " +
-      s"for the ${source.dataType.catalogString} type.")
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1102",
+      messageParameters = Map(
+        "field" -> field,
+        "srcDataType" -> source.dataType.catalogString))
   }
 
   def arrayComponentTypeUnsupportedError(clz: Class[_]): Throwable = {
-    new AnalysisException(s"Unsupported component type $clz in arrays")
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1103",
+      messageParameters = Map("clz" -> clz.toString))
   }
 
   def secondArgumentNotDoubleLiteralError(): Throwable = {
-    new AnalysisException("The second argument should be a double literal.")
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1104",
+      messageParameters = Map.empty)
   }
 
   def dataTypeUnsupportedByExtractValueError(
       dataType: DataType, extraction: Expression, child: Expression): 
Throwable = {
-    val errorMsg = dataType match {
+    dataType match {
       case StructType(_) =>
-        s"Field name should be String Literal, but it's $extraction"
+        new AnalysisException(
+          errorClass = "_LEGACY_ERROR_TEMP_1105",
+          messageParameters = Map("extraction" -> extraction.toString))
       case other =>
-        s"Can't extract value from $child: need struct type but got 
${other.catalogString}"
+        new AnalysisException(
+          errorClass = "_LEGACY_ERROR_TEMP_1106",
+          messageParameters = Map(
+            "child" -> child.toString,
+            "other" -> other.catalogString))
     }
-    new AnalysisException(errorMsg)
   }
 
   def noHandlerForUDAFError(name: String): Throwable = {
@@ -1101,27 +1123,38 @@ private[sql] object QueryCompilationErrors extends 
QueryErrorsBase {
   def batchWriteCapabilityError(
       table: Table, v2WriteClassName: String, v1WriteClassName: String): 
Throwable = {
     new AnalysisException(
-      s"Table ${table.name} declares ${TableCapability.V1_BATCH_WRITE} 
capability but " +
-        s"$v2WriteClassName is not an instance of $v1WriteClassName")
+      errorClass = "_LEGACY_ERROR_TEMP_1107",
+      messageParameters = Map(
+        "table" -> table.name,
+        "batchWrite" -> TableCapability.V1_BATCH_WRITE.toString,
+        "v2WriteClassName" -> v2WriteClassName,
+        "v1WriteClassName" -> v1WriteClassName))
   }
 
   def unsupportedDeleteByConditionWithSubqueryError(condition: Expression): 
Throwable = {
     new AnalysisException(
-      s"Delete by condition with subquery is not supported: $condition")
+      errorClass = "_LEGACY_ERROR_TEMP_1108",
+      messageParameters = Map("condition" -> condition.toString))
   }
 
   def cannotTranslateExpressionToSourceFilterError(f: Expression): Throwable = 
{
-    new AnalysisException("Exec update failed:" +
-      s" cannot translate expression to source filter: $f")
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1109",
+      messageParameters = Map("f" -> f.toString))
   }
 
   def cannotDeleteTableWhereFiltersError(table: Table, filters: 
Array[Predicate]): Throwable = {
     new AnalysisException(
-      s"Cannot delete from table ${table.name} where ${filters.mkString("[", 
", ", "]")}")
+      errorClass = "_LEGACY_ERROR_TEMP_1110",
+      messageParameters = Map(
+        "table" -> table.name,
+        "filters" -> filters.mkString("[", ", ", "]")))
   }
 
   def describeDoesNotSupportPartitionForV2TablesError(): Throwable = {
-    new AnalysisException("DESCRIBE does not support partition for v2 tables.")
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1111",
+      messageParameters = Map.empty)
   }
 
   def cannotReplaceMissingTableError(
@@ -1135,7 +1168,11 @@ private[sql] object QueryCompilationErrors extends 
QueryErrorsBase {
   }
 
   def unsupportedTableOperationError(table: Table, cmd: String): Throwable = {
-    new AnalysisException(s"Table ${table.name} does not support $cmd.")
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1113",
+      messageParameters = Map(
+        "table" -> table.name,
+        "cmd" -> cmd))
   }
 
   def unsupportedBatchReadError(table: Table): Throwable = {
@@ -1166,9 +1203,10 @@ private[sql] object QueryCompilationErrors extends 
QueryErrorsBase {
       microBatchSources: Seq[String],
       continuousSources: Seq[String]): Throwable = {
     new AnalysisException(
-      "The streaming sources in a query do not have a common supported 
execution mode.\n" +
-        "Sources support micro-batch: " + microBatchSources.mkString(", ") + 
"\n" +
-        "Sources support continuous: " + continuousSources.mkString(", "))
+      errorClass = "_LEGACY_ERROR_TEMP_1114",
+      messageParameters = Map(
+        "microBatchSources" -> microBatchSources.mkString(", "),
+        "continuousSources" -> continuousSources.mkString(", ")))
   }
 
   def noSuchTableError(ident: Identifier): Throwable = {
@@ -1188,8 +1226,11 @@ private[sql] object QueryCompilationErrors extends 
QueryErrorsBase {
   }
 
   def requiresSinglePartNamespaceError(ns: Seq[String]): Throwable = {
-    new AnalysisException(CatalogManager.SESSION_CATALOG_NAME +
-      " requires a single-part namespace, but got " + ns.mkString("[", ", ", 
"]"))
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1117",
+      messageParameters = Map(
+        "sessionCatalog" -> CatalogManager.SESSION_CATALOG_NAME,
+        "ns" -> ns.mkString("[", ", ", "]")))
   }
 
   def namespaceAlreadyExistsError(namespace: Array[String]): Throwable = {
@@ -1197,7 +1238,9 @@ private[sql] object QueryCompilationErrors extends 
QueryErrorsBase {
   }
 
   private def notSupportedInJDBCCatalog(cmd: String): Throwable = {
-    new AnalysisException(s"$cmd is not supported in JDBC catalog.")
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1119",
+      messageParameters = Map("cmd" -> cmd))
   }
 
   def cannotCreateJDBCTableUsingProviderError(): Throwable = {
@@ -1225,11 +1268,17 @@ private[sql] object QueryCompilationErrors extends 
QueryErrorsBase {
   }
 
   def unsupportedJDBCNamespaceChangeInCatalogError(changes: 
Seq[NamespaceChange]): Throwable = {
-    new AnalysisException(s"Unsupported NamespaceChange $changes in JDBC 
catalog.")
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1120",
+      messageParameters = Map("changes" -> changes.toString()))
   }
 
   private def tableDoesNotSupportError(cmd: String, table: Table): Throwable = 
{
-    new AnalysisException(s"Table does not support $cmd: ${table.name}")
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1121",
+      messageParameters = Map(
+        "cmd" -> cmd,
+        "table" -> table.name))
   }
 
   def tableDoesNotSupportReadsError(table: Table): Throwable = {
@@ -1257,16 +1306,21 @@ private[sql] object QueryCompilationErrors extends 
QueryErrorsBase {
   }
 
   def tableIsNotRowLevelOperationTableError(table: Table): Throwable = {
-    throw new AnalysisException(s"Table ${table.name} is not a row-level 
operation table")
+        new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1122",
+      messageParameters = Map("table" -> table.name()))
   }
 
   def cannotRenameTableWithAlterViewError(): Throwable = {
-    new AnalysisException(
-      "Cannot rename a table with ALTER VIEW. Please use ALTER TABLE instead.")
+        new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1123",
+      messageParameters = Map.empty)
   }
 
   private def notSupportedForV2TablesError(cmd: String): Throwable = {
-    new AnalysisException(s"$cmd is not supported for v2 tables.")
+        new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1124",
+      messageParameters = Map("cmd" -> cmd))
   }
 
   def analyzeTableNotSupportedForV2TablesError(): Throwable = {
@@ -1298,108 +1352,123 @@ private[sql] object QueryCompilationErrors extends 
QueryErrorsBase {
   }
 
   def databaseFromV1SessionCatalogNotSpecifiedError(): Throwable = {
-    new AnalysisException("Database from v1 session catalog is not specified")
+        new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1125",
+      messageParameters = Map.empty)
   }
 
   def nestedDatabaseUnsupportedByV1SessionCatalogError(catalog: String): 
Throwable = {
-    new AnalysisException(s"Nested databases are not supported by v1 session 
catalog: $catalog")
+        new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1126",
+      messageParameters = Map("catalog" -> catalog))
   }
 
   def invalidRepartitionExpressionsError(sortOrders: Seq[Any]): Throwable = {
-    new AnalysisException(s"Invalid partitionExprs specified: $sortOrders For 
range " +
-      "partitioning use REPARTITION_BY_RANGE instead.")
+        new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1127",
+      messageParameters = Map("sortOrders" -> sortOrders.toString()))
   }
 
   def partitionColumnNotSpecifiedError(format: String, partitionColumn: 
String): Throwable = {
-    new AnalysisException(s"Failed to resolve the schema for $format for " +
-      s"the partition column: $partitionColumn. It must be specified 
manually.")
+        new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1128",
+      messageParameters = Map(
+        "format" -> format,
+        "partitionColumn" -> partitionColumn))
   }
 
   def dataSchemaNotSpecifiedError(format: String): Throwable = {
-    new AnalysisException(s"Unable to infer schema for $format. It must be 
specified manually.")
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1129",
+      messageParameters = Map("format" -> format))
   }
 
   def dataPathNotExistError(path: String): Throwable = {
-    new AnalysisException(s"Path does not exist: $path")
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1130",
+      messageParameters = Map("path" -> path))
   }
 
   def dataSourceOutputModeUnsupportedError(
       className: String, outputMode: OutputMode): Throwable = {
-    new AnalysisException(s"Data source $className does not support 
$outputMode output mode")
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1131",
+      messageParameters = Map(
+        "className" -> className,
+        "outputMode" -> outputMode.toString))
   }
 
   def schemaNotSpecifiedForSchemaRelationProviderError(className: String): 
Throwable = {
-    new AnalysisException(s"A schema needs to be specified when using 
$className.")
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1132",
+      messageParameters = Map("className" -> className))
   }
 
   def userSpecifiedSchemaMismatchActualSchemaError(
       schema: StructType, actualSchema: StructType): Throwable = {
     new AnalysisException(
-      s"""
-         |The user-specified schema doesn't match the actual schema:
-         |user-specified: ${schema.toDDL}, actual: ${actualSchema.toDDL}. If 
you're using
-         |DataFrameReader.schema API or creating a table, please do not 
specify the schema.
-         |Or if you're scanning an existed table, please drop it and re-create 
it.
-       """.stripMargin)
+      errorClass = "_LEGACY_ERROR_TEMP_1133",
+      messageParameters = Map(
+        "schema" -> schema.toDDL,
+        "actualSchema" -> actualSchema.toDDL))
   }
 
   def dataSchemaNotSpecifiedError(format: String, fileCatalog: String): 
Throwable = {
     new AnalysisException(
-      s"Unable to infer schema for $format at $fileCatalog. It must be 
specified manually")
+      errorClass = "_LEGACY_ERROR_TEMP_1134",
+      messageParameters = Map(
+        "format" -> format,
+        "fileCatalog" -> fileCatalog))
   }
 
   def invalidDataSourceError(className: String): Throwable = {
-    new AnalysisException(s"$className is not a valid Spark SQL Data Source.")
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1135",
+      messageParameters = Map("className" -> className))
   }
 
   def cannotSaveIntervalIntoExternalStorageError(): Throwable = {
-    new AnalysisException("Cannot save interval data type into external 
storage.")
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1136",
+      messageParameters = Map.empty)
   }
 
   def cannotResolveAttributeError(name: String, outputStr: String): Throwable 
= {
     new AnalysisException(
-      s"Unable to resolve $name given [$outputStr]")
+      errorClass = "_LEGACY_ERROR_TEMP_1137",
+      messageParameters = Map("name" -> name, "outputStr" -> outputStr))
   }
 
   def orcNotUsedWithHiveEnabledError(): Throwable = {
     new AnalysisException(
-      s"""
-         |Hive built-in ORC data source must be used with Hive support enabled.
-         |Please use the native ORC data source by setting 
'spark.sql.orc.impl' to 'native'
-       """.stripMargin)
+      errorClass = "_LEGACY_ERROR_TEMP_1138",
+      messageParameters = Map.empty)
   }
 
   def failedToFindAvroDataSourceError(provider: String): Throwable = {
     new AnalysisException(
-      s"""
-         |Failed to find data source: $provider. Avro is built-in but external 
data
-         |source module since Spark 2.4. Please deploy the application as per
-         |the deployment section of "Apache Avro Data Source Guide".
-       """.stripMargin.replaceAll("\n", " "))
+      errorClass = "_LEGACY_ERROR_TEMP_1139",
+      messageParameters = Map("provider" -> provider))
   }
 
   def failedToFindKafkaDataSourceError(provider: String): Throwable = {
     new AnalysisException(
-      s"""
-         |Failed to find data source: $provider. Please deploy the application 
as
-         |per the deployment section of "Structured Streaming + Kafka 
Integration Guide".
-       """.stripMargin.replaceAll("\n", " "))
+      errorClass = "_LEGACY_ERROR_TEMP_1140",
+      messageParameters = Map("provider" -> provider))
   }
 
   def findMultipleDataSourceError(provider: String, sourceNames: Seq[String]): 
Throwable = {
     new AnalysisException(
-      s"""
-         |Multiple sources found for $provider (${sourceNames.mkString(", ")}),
-         | please specify the fully qualified class name.
-       """.stripMargin)
+      errorClass = "_LEGACY_ERROR_TEMP_1141",
+      messageParameters = Map(
+        "provider" -> provider,
+        "sourceNames" -> sourceNames.mkString(", ")))
   }
 
   def writeEmptySchemasUnsupportedByDataSourceError(): Throwable = {
     new AnalysisException(
-      s"""
-         |Datasource does not support writing empty or nested empty schemas.
-         |Please make sure the data schema has at least one or more column(s).
-       """.stripMargin)
+      errorClass = "_LEGACY_ERROR_TEMP_1142",
+      messageParameters = Map.empty)
   }
 
   def insertMismatchedColumnNumberError(
@@ -1407,119 +1476,132 @@ private[sql] object QueryCompilationErrors extends 
QueryErrorsBase {
       sourceAttributes: Seq[Attribute],
       staticPartitionsSize: Int): Throwable = {
     new AnalysisException(
-      s"""
-         |The data to be inserted needs to have the same number of columns as 
the
-         |target table: target table has ${targetAttributes.size} column(s) 
but the
-         |inserted data has ${sourceAttributes.size + staticPartitionsSize} 
column(s),
-         |which contain $staticPartitionsSize partition column(s) having 
assigned
-         |constant values.
-       """.stripMargin)
+      errorClass = "_LEGACY_ERROR_TEMP_1143",
+      messageParameters = Map(
+        "targetSize" -> targetAttributes.size.toString,
+        "actualSize" -> (sourceAttributes.size + 
staticPartitionsSize).toString,
+        "staticPartitionsSize" -> staticPartitionsSize.toString))
   }
 
   def insertMismatchedPartitionNumberError(
       targetPartitionSchema: StructType,
       providedPartitionsSize: Int): Throwable = {
     new AnalysisException(
-      s"""
-         |The data to be inserted needs to have the same number of partition 
columns
-         |as the target table: target table has 
${targetPartitionSchema.fields.size}
-         |partition column(s) but the inserted data has $providedPartitionsSize
-         |partition columns specified.
-       """.stripMargin.replaceAll("\n", " "))
+      errorClass = "_LEGACY_ERROR_TEMP_1144",
+      messageParameters = Map(
+        "targetSize" -> targetPartitionSchema.fields.size.toString,
+        "providedPartitionsSize" -> providedPartitionsSize.toString))
   }
 
   def invalidPartitionColumnError(
       partKey: String, targetPartitionSchema: StructType): Throwable = {
     new AnalysisException(
-      s"""
-         |$partKey is not a partition column. Partition columns are
-         |${targetPartitionSchema.fields.map(_.name).mkString("[", ",", "]")}
-       """.stripMargin)
+      errorClass = "_LEGACY_ERROR_TEMP_1145",
+      messageParameters = Map(
+        "partKey" -> partKey,
+        "partitionColumns" -> 
targetPartitionSchema.fields.map(_.name).mkString("[", ",", "]")))
   }
 
   def multiplePartitionColumnValuesSpecifiedError(
       field: StructField, potentialSpecs: Map[String, String]): Throwable = {
     new AnalysisException(
-      s"""
-         |Partition column ${field.name} have multiple values specified,
-         |${potentialSpecs.mkString("[", ", ", "]")}. Please only specify a 
single value.
-       """.stripMargin)
+      errorClass = "_LEGACY_ERROR_TEMP_1146",
+      messageParameters = Map(
+        "partColumn" -> field.name,
+        "values" -> potentialSpecs.mkString("[", ", ", "]")))
   }
 
   def invalidOrderingForConstantValuePartitionColumnError(
       targetPartitionSchema: StructType): Throwable = {
     new AnalysisException(
-      s"""
-         |The ordering of partition columns is
-         |${targetPartitionSchema.fields.map(_.name).mkString("[", ",", "]")}
-         |All partition columns having constant values need to appear before 
other
-         |partition columns that do not have an assigned constant value.
-       """.stripMargin)
+      errorClass = "_LEGACY_ERROR_TEMP_1147",
+      messageParameters = Map(
+        "partColumns" -> 
targetPartitionSchema.fields.map(_.name).mkString("[", ",", "]")))
   }
 
   def cannotWriteDataToRelationsWithMultiplePathsError(): Throwable = {
-    new AnalysisException("Can only write data to relations with a single 
path.")
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1148",
+      messageParameters = Map.empty)
   }
 
   def failedToRebuildExpressionError(filter: Filter): Throwable = {
     new AnalysisException(
-      s"Fail to rebuild expression: missing key $filter in 
`translatedFilterToExpr`")
+      errorClass = "_LEGACY_ERROR_TEMP_1149",
+      messageParameters = Map("filter" -> filter.toString))
   }
 
   def dataTypeUnsupportedByDataSourceError(format: String, field: 
StructField): Throwable = {
-    new AnalysisException(s"Column `${field.name}` has a data type of " +
-      s"${field.dataType.catalogString}, which is not supported by $format."
-    )
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1150",
+      messageParameters = Map(
+        "field" -> field.name,
+        "fieldType" -> field.dataType.catalogString,
+        "format" -> format))
   }
 
   def failToResolveDataSourceForTableError(table: CatalogTable, key: String): 
Throwable = {
     new AnalysisException(
-      s"""
-         |Fail to resolve data source for the table ${table.identifier} since 
the table
-         |serde property has the duplicated key $key with extra options 
specified for this
-         |scan operation. To fix this, you can rollback to the legacy behavior 
of ignoring
-         |the extra options by setting the config
-         |${SQLConf.LEGACY_EXTRA_OPTIONS_BEHAVIOR.key} to `false`, or address 
the
-         |conflicts of the same config.
-       """.stripMargin)
+      errorClass = "_LEGACY_ERROR_TEMP_1151",
+      messageParameters = Map(
+        "table" -> table.identifier.toString,
+        "key" -> key,
+        "config" -> SQLConf.LEGACY_EXTRA_OPTIONS_BEHAVIOR.key))
   }
 
   def outputPathAlreadyExistsError(outputPath: Path): Throwable = {
-    new AnalysisException(s"path $outputPath already exists.")
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1152",
+      messageParameters = Map("outputPath" -> outputPath.toString))
   }
 
   def cannotUseDataTypeForPartitionColumnError(field: StructField): Throwable 
= {
-    new AnalysisException(s"Cannot use ${field.dataType} for partition column")
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1153",
+      messageParameters = Map("field" -> field.dataType.toString))
   }
 
   def cannotUseAllColumnsForPartitionColumnsError(): Throwable = {
-    new AnalysisException(s"Cannot use all columns for partition columns")
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1154",
+      messageParameters = Map.empty)
   }
 
   def partitionColumnNotFoundInSchemaError(col: String, schemaCatalog: 
String): Throwable = {
-    new AnalysisException(s"Partition column `$col` not found in schema 
$schemaCatalog")
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1155",
+      messageParameters = Map("col" -> col, "schemaCatalog" -> schemaCatalog))
   }
 
   def columnNotFoundInSchemaError(
       col: StructField, tableSchema: Option[StructType]): Throwable = {
-    new AnalysisException(s"""Column "${col.name}" not found in schema 
$tableSchema""")
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1156",
+      messageParameters = Map(
+        "colName" -> col.name,
+        "tableSchema" -> tableSchema.toString))
   }
 
   def unsupportedDataSourceTypeForDirectQueryOnFilesError(className: String): 
Throwable = {
-    new AnalysisException(s"Unsupported data source type for direct query on 
files: $className")
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1157",
+      messageParameters = Map("className" -> className))
   }
 
   def saveDataIntoViewNotAllowedError(): Throwable = {
-    new AnalysisException("Saving data into a view is not allowed.")
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1158",
+      messageParameters = Map.empty)
   }
 
   def mismatchedTableFormatError(
       tableName: String, existingProvider: Class[_], specifiedProvider: 
Class[_]): Throwable = {
     new AnalysisException(
-      s"""
-         |The format of the existing table $tableName is 
`${existingProvider.getSimpleName}`.
-         |It doesn't match the specified format 
`${specifiedProvider.getSimpleName}`.
-       """.stripMargin)
+      errorClass = "_LEGACY_ERROR_TEMP_1159",
+      messageParameters = Map(
+        "tableName" -> tableName,
+        "existingProvider" -> existingProvider.getSimpleName,
+        "specifiedProvider" -> specifiedProvider.getSimpleName))
   }
 
   def mismatchedTableLocationError(
@@ -1527,11 +1609,11 @@ private[sql] object QueryCompilationErrors extends 
QueryErrorsBase {
       existingTable: CatalogTable,
       tableDesc: CatalogTable): Throwable = {
     new AnalysisException(
-      s"""
-         |The location of the existing table ${identifier.quotedString} is
-         |`${existingTable.location}`. It doesn't match the specified location
-         |`${tableDesc.location}`.
-       """.stripMargin)
+      errorClass = "_LEGACY_ERROR_TEMP_1160",
+      messageParameters = Map(
+        "identifier" -> identifier.quotedString,
+        "existingTableLoc" -> existingTable.location.toString,
+        "tableDescLoc" -> tableDesc.location.toString))
   }
 
   def mismatchedTableColumnNumberError(
@@ -1539,15 +1621,19 @@ private[sql] object QueryCompilationErrors extends 
QueryErrorsBase {
       existingTable: CatalogTable,
       query: LogicalPlan): Throwable = {
     new AnalysisException(
-      s"""
-         |The column number of the existing table $tableName
-         |(${existingTable.schema.catalogString}) doesn't match the data schema
-         |(${query.schema.catalogString})
-       """.stripMargin)
+      errorClass = "_LEGACY_ERROR_TEMP_1161",
+      messageParameters = Map(
+        "tableName" -> tableName,
+        "existingTableSchema" -> existingTable.schema.catalogString,
+        "querySchema" -> query.schema.catalogString))
   }
 
   def cannotResolveColumnGivenInputColumnsError(col: String, inputColumns: 
String): Throwable = {
-    new AnalysisException(s"cannot resolve '$col' given input columns: 
[$inputColumns]")
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1162",
+      messageParameters = Map(
+        "col" -> col,
+        "inputColumns" -> inputColumns))
   }
 
   def mismatchedTablePartitionColumnError(
@@ -1555,11 +1641,11 @@ private[sql] object QueryCompilationErrors extends 
QueryErrorsBase {
       specifiedPartCols: Seq[String],
       existingPartCols: String): Throwable = {
     new AnalysisException(
-      s"""
-         |Specified partitioning does not match that of the existing table 
$tableName.
-         |Specified partition columns: [${specifiedPartCols.mkString(", ")}]
-         |Existing partition columns: [$existingPartCols]
-       """.stripMargin)
+      errorClass = "_LEGACY_ERROR_TEMP_1163",
+      messageParameters = Map(
+        "tableName" -> tableName,
+        "specifiedPartCols" -> specifiedPartCols.mkString(", "),
+        "existingPartCols" -> existingPartCols))
   }
 
   def mismatchedTableBucketingError(
@@ -1567,37 +1653,46 @@ private[sql] object QueryCompilationErrors extends 
QueryErrorsBase {
       specifiedBucketString: String,
       existingBucketString: String): Throwable = {
     new AnalysisException(
-      s"""
-         |Specified bucketing does not match that of the existing table 
$tableName.
-         |Specified bucketing: $specifiedBucketString
-         |Existing bucketing: $existingBucketString
-       """.stripMargin)
+      errorClass = "_LEGACY_ERROR_TEMP_1164",
+      messageParameters = Map(
+        "tableName" -> tableName,
+        "specifiedBucketString" -> specifiedBucketString,
+        "existingBucketString" -> existingBucketString))
   }
 
   def specifyPartitionNotAllowedWhenTableSchemaNotDefinedError(): Throwable = {
-    new AnalysisException("It is not allowed to specify partitioning when the 
" +
-      "table schema is not defined.")
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1165",
+      messageParameters = Map.empty)
   }
 
   def bucketingColumnCannotBePartOfPartitionColumnsError(
       bucketCol: String, normalizedPartCols: Seq[String]): Throwable = {
-    new AnalysisException(s"bucketing column '$bucketCol' should not be part 
of " +
-      s"partition columns '${normalizedPartCols.mkString(", ")}'")
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1166",
+      messageParameters = Map(
+        "bucketCol" -> bucketCol,
+        "normalizedPartCols" -> normalizedPartCols.mkString(", ")))
   }
 
   def bucketSortingColumnCannotBePartOfPartitionColumnsError(
     sortCol: String, normalizedPartCols: Seq[String]): Throwable = {
-    new AnalysisException(s"bucket sorting column '$sortCol' should not be 
part of " +
-      s"partition columns '${normalizedPartCols.mkString(", ")}'")
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1167",
+      messageParameters = Map(
+        "sortCol" -> sortCol,
+        "normalizedPartCols" -> normalizedPartCols.mkString(", ")))
   }
 
   def mismatchedInsertedDataColumnNumberError(
       tableName: String, insert: InsertIntoStatement, staticPartCols: 
Set[String]): Throwable = {
     new AnalysisException(
-      s"$tableName requires that the data to be inserted have the same number 
of columns as " +
-        s"the target table: target table has ${insert.table.output.size} 
column(s) but the " +
-        s"inserted data has ${insert.query.output.length + 
staticPartCols.size} column(s), " +
-        s"including ${staticPartCols.size} partition column(s) having constant 
value(s).")
+      errorClass = "_LEGACY_ERROR_TEMP_1168",
+      messageParameters = Map(
+        "tableName" -> tableName,
+        "targetColumns" -> insert.table.output.size.toString,
+        "insertedColumns" -> (insert.query.output.length + 
staticPartCols.size).toString,
+        "staticPartCols" -> staticPartCols.size.toString))
   }
 
   def requestedPartitionsMismatchTablePartitionsError(
@@ -1605,37 +1700,48 @@ private[sql] object QueryCompilationErrors extends 
QueryErrorsBase {
       normalizedPartSpec: Map[String, Option[String]],
       partColNames: StructType): Throwable = {
     new AnalysisException(
-      s"""
-         |Requested partitioning does not match the table $tableName:
-         |Requested partitions: ${normalizedPartSpec.keys.mkString(",")}
-         |Table partitions: ${partColNames.mkString(",")}
-       """.stripMargin)
+      errorClass = "_LEGACY_ERROR_TEMP_1169",
+      messageParameters = Map(
+        "tableName" -> tableName,
+        "normalizedPartSpec" -> normalizedPartSpec.keys.mkString(","),
+        "partColNames" -> partColNames.mkString(",")))
   }
 
   def ddlWithoutHiveSupportEnabledError(detail: String): Throwable = {
-    new AnalysisException(s"Hive support is required to $detail")
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1170",
+      messageParameters = Map("detail" -> detail))
   }
 
   def createTableColumnTypesOptionColumnNotFoundInSchemaError(
       col: String, schema: StructType): Throwable = {
     new AnalysisException(
-      s"createTableColumnTypes option column $col not found in schema 
${schema.catalogString}")
+      errorClass = "_LEGACY_ERROR_TEMP_1171",
+      messageParameters = Map("col" -> col, "schema" -> schema.catalogString))
   }
 
   def parquetTypeUnsupportedYetError(parquetType: String): Throwable = {
-    new AnalysisException(s"Parquet type not yet supported: $parquetType")
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1172",
+      messageParameters = Map("parquetType" -> parquetType))
   }
 
   def illegalParquetTypeError(parquetType: String): Throwable = {
-    new AnalysisException(s"Illegal Parquet type: $parquetType")
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1173",
+      messageParameters = Map("parquetType" -> parquetType))
   }
 
   def unrecognizedParquetTypeError(field: String): Throwable = {
-    new AnalysisException(s"Unrecognized Parquet type: $field")
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1174",
+      messageParameters = Map("field" -> field))
   }
 
   def cannotConvertDataTypeToParquetTypeError(field: StructField): Throwable = 
{
-    new AnalysisException(s"Unsupported data type 
${field.dataType.catalogString}")
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1175",
+      messageParameters = Map("dataType" -> field.dataType.catalogString))
   }
 
   def incompatibleViewSchemaChange(
@@ -1644,32 +1750,59 @@ private[sql] object QueryCompilationErrors extends 
QueryErrorsBase {
       expectedNum: Int,
       actualCols: Seq[Attribute],
       viewDDL: Option[String]): Throwable = {
-    new AnalysisException(s"The SQL query of view $viewName has an 
incompatible schema change " +
-      s"and column $colName cannot be resolved. Expected $expectedNum columns 
named $colName but " +
-      s"got ${actualCols.map(_.name).mkString("[", ",", "]")}" +
-      viewDDL.map(s => s"\nPlease try to re-create the view by running: 
$s").getOrElse(""))
+    viewDDL.map { v =>
+      new AnalysisException(
+        errorClass = "_LEGACY_ERROR_TEMP_1176",
+        messageParameters = Map(
+          "viewName" -> viewName,
+          "colName" -> colName,
+          "expectedNum" -> expectedNum.toString,
+          "actualCols" -> actualCols.map(_.name).mkString("[", ",", "]"),
+          "viewDDL" -> v))
+    }.getOrElse {
+      new AnalysisException(
+        errorClass = "_LEGACY_ERROR_TEMP_1177",
+        messageParameters = Map(
+          "viewName" -> viewName,
+          "colName" -> colName,
+          "expectedNum" -> expectedNum.toString,
+          "actualCols" -> actualCols.map(_.name).mkString("[", ",", "]")))
+    }
   }
 
   def numberOfPartitionsNotAllowedWithUnspecifiedDistributionError(): 
Throwable = {
-    throw new AnalysisException("The number of partitions can't be specified 
with unspecified" +
-      " distribution. Invalid writer requirements detected.")
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1178",
+      messageParameters = Map.empty)
   }
 
   def cannotApplyTableValuedFunctionError(
       name: String, arguments: String, usage: String, details: String = ""): 
Throwable = {
-    new AnalysisException(s"Table-valued function $name with alternatives: 
$usage\n" +
-      s"cannot be applied to ($arguments): $details")
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1179",
+      messageParameters = Map(
+        "name" -> name,
+        "usage" -> usage,
+        "arguments" -> arguments,
+        "details" -> details))
   }
 
   def incompatibleRangeInputDataTypeError(
       expression: Expression, dataType: DataType): Throwable = {
-    new AnalysisException(s"Incompatible input data type. " +
-      s"Expected: ${dataType.typeName}; Found: 
${expression.dataType.typeName}")
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1180",
+      messageParameters = Map(
+        "expectedDataType" -> dataType.typeName,
+        "foundDataType" -> expression.dataType.typeName))
   }
 
   def streamJoinStreamWithoutEqualityPredicateUnsupportedError(plan: 
LogicalPlan): Throwable = {
+    val errorClass = "_LEGACY_ERROR_TEMP_1181"
     new AnalysisException(
-      "Stream-stream join without equality predicate is not supported", plan = 
Some(plan))
+      SparkThrowableHelper.getMessage(errorClass, null, Map.empty[String, 
String]),
+      errorClass = Some(errorClass),
+      messageParameters = Map.empty,
+      plan = Some(plan))
   }
 
   def invalidPandasUDFPlacementError(
@@ -1683,15 +1816,10 @@ private[sql] object QueryCompilationErrors extends 
QueryErrorsBase {
   def ambiguousAttributesInSelfJoinError(
       ambiguousAttrs: Seq[AttributeReference]): Throwable = {
     new AnalysisException(
-      s"""
-         |Column ${ambiguousAttrs.mkString(", ")} are ambiguous. It's probably 
because
-         |you joined several Datasets together, and some of these Datasets are 
the same.
-         |This column points to one of the Datasets but Spark is unable to 
figure out
-         |which one. Please alias the Datasets with different names via 
`Dataset.as`
-         |before joining them, and specify the column using qualified name, 
e.g.
-         |`df.as("a").join(df.as("b"), $$"a.id" > $$"b.id")`. You can also set
-         |${SQLConf.FAIL_AMBIGUOUS_SELF_JOIN_ENABLED.key} to false to disable 
this check.
-       """.stripMargin.replaceAll("\n", " "))
+      errorClass = "_LEGACY_ERROR_TEMP_1182",
+      messageParameters = Map(
+        "ambiguousAttrs" -> ambiguousAttrs.mkString(", "),
+        "config" -> SQLConf.FAIL_AMBIGUOUS_SELF_JOIN_ENABLED.key))
   }
 
   def ambiguousColumnOrFieldError(
@@ -1714,74 +1842,108 @@ private[sql] object QueryCompilationErrors extends 
QueryErrorsBase {
   }
 
   def cannotUseIntervalTypeInTableSchemaError(): Throwable = {
-    new AnalysisException("Cannot use interval type in the table schema.")
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1183",
+      messageParameters = Map.empty)
   }
 
   def missingCatalogAbilityError(plugin: CatalogPlugin, ability: String): 
Throwable = {
-    new AnalysisException(s"Catalog ${plugin.name} does not support $ability")
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1184",
+      messageParameters = Map(
+        "plugin" -> plugin.name,
+        "ability" -> ability))
   }
 
   def identifierHavingMoreThanTwoNamePartsError(
       quoted: String, identifier: String): Throwable = {
-    new AnalysisException(s"$quoted is not a valid $identifier as it has more 
than 2 name parts.")
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1185",
+      messageParameters = Map(
+        "quoted" -> quoted,
+        "identifier" -> identifier))
   }
 
   def emptyMultipartIdentifierError(): Throwable = {
-    new AnalysisException("multi-part identifier cannot be empty.")
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1186",
+      messageParameters = Map.empty)
   }
 
   def cannotOperateOnHiveDataSourceFilesError(operation: String): Throwable = {
-    new AnalysisException("Hive data source can only be used with tables, you 
can not " +
-      s"$operation files of Hive data source directly.")
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1187",
+      messageParameters = Map("operation" -> operation))
   }
 
   def setPathOptionAndCallWithPathParameterError(method: String): Throwable = {
     new AnalysisException(
-      s"""
-         |There is a 'path' option set and $method() is called with a path
-         |parameter. Either remove the path option, or call $method() without 
the parameter.
-         |To ignore this check, set 
'${SQLConf.LEGACY_PATH_OPTION_BEHAVIOR.key}' to 'true'.
-       """.stripMargin.replaceAll("\n", " "))
+      errorClass = "_LEGACY_ERROR_TEMP_1188",
+      messageParameters = Map(
+        "method" -> method,
+        "config" -> SQLConf.LEGACY_PATH_OPTION_BEHAVIOR.key))
   }
 
   def userSpecifiedSchemaUnsupportedError(operation: String): Throwable = {
-    new AnalysisException(s"User specified schema not supported with 
`$operation`")
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1189",
+      messageParameters = Map("operation" -> operation))
   }
 
   def tempViewNotSupportStreamingWriteError(viewName: String): Throwable = {
-    new AnalysisException(s"Temporary view $viewName doesn't support streaming 
write")
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1190",
+      messageParameters = Map("viewName" -> viewName))
   }
 
   def streamingIntoViewNotSupportedError(viewName: String): Throwable = {
-    new AnalysisException(s"Streaming into views $viewName is not supported.")
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1191",
+      messageParameters = Map("viewName" -> viewName))
   }
 
   def inputSourceDiffersFromDataSourceProviderError(
       source: String, tableName: String, table: CatalogTable): Throwable = {
-    new AnalysisException(s"The input source($source) is different from the 
table " +
-      s"$tableName's data source provider(${table.provider.get}).")
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1192",
+      messageParameters = Map(
+        "source" -> source,
+        "tableName" -> tableName,
+        "provider" -> table.provider.get))
   }
 
   def tableNotSupportStreamingWriteError(tableName: String, t: Table): 
Throwable = {
-    new AnalysisException(s"Table $tableName doesn't support streaming write - 
$t")
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1193",
+      messageParameters = Map("tableName" -> tableName, "t" -> t.toString))
   }
 
   def queryNameNotSpecifiedForMemorySinkError(): Throwable = {
-    new AnalysisException("queryName must be specified for memory sink")
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1194",
+      messageParameters = Map.empty)
   }
 
   def sourceNotSupportedWithContinuousTriggerError(source: String): Throwable 
= {
-    new AnalysisException(s"'$source' is not supported with continuous 
trigger")
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1195",
+      messageParameters = Map("source" -> source))
   }
 
   def columnNotFoundInExistingColumnsError(
       columnType: String, columnName: String, validColumnNames: Seq[String]): 
Throwable = {
-    new AnalysisException(s"$columnType column $columnName not found in " +
-      s"existing columns (${validColumnNames.mkString(", ")})")
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1196",
+      messageParameters = Map(
+        "columnType" -> columnType,
+        "columnName" -> columnName,
+        "validColumnNames" -> validColumnNames.mkString(", ")))
   }
 
   def operationNotSupportPartitioningError(operation: String): Throwable = {
-    new AnalysisException(s"'$operation' does not support partitioning")
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1197",
+      messageParameters = Map("operation" -> operation))
   }
 
   def mixedRefsInAggFunc(funcStr: String, origin: Origin): Throwable = {
@@ -1796,16 +1958,24 @@ private[sql] object QueryCompilationErrors extends 
QueryErrorsBase {
       unbound: UnboundFunction,
       arguments: Seq[Expression],
       unsupported: UnsupportedOperationException): Throwable = {
-    new AnalysisException(s"Function '${unbound.name}' cannot process " +
-      s"input: (${arguments.map(_.dataType.simpleString).mkString(", ")}): " +
-      unsupported.getMessage, cause = Some(unsupported))
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1198",
+      messageParameters = Map(
+        "unbound" -> unbound.name,
+        "arguments" -> arguments.map(_.dataType.simpleString).mkString(", "),
+        "unsupported" -> unsupported.getMessage),
+      cause = Some(unsupported))
   }
 
   def v2FunctionInvalidInputTypeLengthError(
       bound: BoundFunction,
       args: Seq[Expression]): Throwable = {
-    new AnalysisException(s"Invalid bound function '${bound.name()}: there are 
${args.length} " +
-        s"arguments but ${bound.inputTypes().length} parameters returned from 
'inputTypes()'")
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1199",
+      messageParameters = Map(
+        "bound" -> bound.name(),
+        "argsLen" -> args.length.toString,
+        "inputTypesLen" -> bound.inputTypes().length.toString))
   }
 
   def ambiguousRelationAliasNameInNestedCTEError(name: String): Throwable = {
diff --git 
a/sql/core/src/test/resources/sql-tests/results/ansi/string-functions.sql.out 
b/sql/core/src/test/resources/sql-tests/results/ansi/string-functions.sql.out
index 2379d5e35e7..1176042393b 100644
--- 
a/sql/core/src/test/resources/sql-tests/results/ansi/string-functions.sql.out
+++ 
b/sql/core/src/test/resources/sql-tests/results/ansi/string-functions.sql.out
@@ -1539,4 +1539,12 @@ select to_binary('abc', 'invalidFormat')
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-Invalid value for the 'format' parameter of function 'to_binary': 
invalidformat. The value has to be a case-insensitive string literal of 'hex', 
'utf-8', 'utf8', or 'base64'.
+{
+  "errorClass" : "_LEGACY_ERROR_TEMP_1101",
+  "messageParameters" : {
+    "argName" : "format",
+    "endingMsg" : " The value has to be a case-insensitive string literal of 
'hex', 'utf-8', 'utf8', or 'base64'.",
+    "funcName" : "to_binary",
+    "invalidValue" : "invalidformat"
+  }
+}
diff --git 
a/sql/core/src/test/resources/sql-tests/results/ceil-floor-with-scale-param.sql.out
 
b/sql/core/src/test/resources/sql-tests/results/ceil-floor-with-scale-param.sql.out
index c1af10e5ec1..82ef294d943 100644
--- 
a/sql/core/src/test/resources/sql-tests/results/ceil-floor-with-scale-param.sql.out
+++ 
b/sql/core/src/test/resources/sql-tests/results/ceil-floor-with-scale-param.sql.out
@@ -93,7 +93,21 @@ SELECT CEIL(2.5, null)
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-The 'scale' parameter of function 'ceil' needs to be a int literal.; line 1 
pos 7
+{
+  "errorClass" : "_LEGACY_ERROR_TEMP_1100",
+  "messageParameters" : {
+    "argName" : "scale",
+    "funcName" : "ceil",
+    "requiredType" : "int"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 8,
+    "stopIndex" : 22,
+    "fragment" : "CEIL(2.5, null)"
+  } ]
+}
 
 
 -- !query
@@ -102,7 +116,21 @@ SELECT CEIL(2.5, 'a')
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-The 'scale' parameter of function 'ceil' needs to be a int literal.; line 1 
pos 7
+{
+  "errorClass" : "_LEGACY_ERROR_TEMP_1100",
+  "messageParameters" : {
+    "argName" : "scale",
+    "funcName" : "ceil",
+    "requiredType" : "int"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 8,
+    "stopIndex" : 21,
+    "fragment" : "CEIL(2.5, 'a')"
+  } ]
+}
 
 
 -- !query
@@ -222,7 +250,21 @@ SELECT FLOOR(2.5, null)
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-The 'scale' parameter of function 'floor' needs to be a int literal.; line 1 
pos 7
+{
+  "errorClass" : "_LEGACY_ERROR_TEMP_1100",
+  "messageParameters" : {
+    "argName" : "scale",
+    "funcName" : "floor",
+    "requiredType" : "int"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 8,
+    "stopIndex" : 23,
+    "fragment" : "FLOOR(2.5, null)"
+  } ]
+}
 
 
 -- !query
@@ -231,7 +273,21 @@ SELECT FLOOR(2.5, 'a')
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-The 'scale' parameter of function 'floor' needs to be a int literal.; line 1 
pos 7
+{
+  "errorClass" : "_LEGACY_ERROR_TEMP_1100",
+  "messageParameters" : {
+    "argName" : "scale",
+    "funcName" : "floor",
+    "requiredType" : "int"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 8,
+    "stopIndex" : 22,
+    "fragment" : "FLOOR(2.5, 'a')"
+  } ]
+}
 
 
 -- !query
diff --git a/sql/core/src/test/resources/sql-tests/results/extract.sql.out 
b/sql/core/src/test/resources/sql-tests/results/extract.sql.out
index d631004d12b..1c40f623ea1 100644
--- a/sql/core/src/test/resources/sql-tests/results/extract.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/extract.sql.out
@@ -648,7 +648,20 @@ select date_part('not_supported', c) from t
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-Literals of type 'not_supported' are currently not supported for the string 
type.; line 1 pos 7
+{
+  "errorClass" : "_LEGACY_ERROR_TEMP_1102",
+  "messageParameters" : {
+    "field" : "not_supported",
+    "srcDataType" : "string"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 8,
+    "stopIndex" : 36,
+    "fragment" : "date_part('not_supported', c)"
+  } ]
+}
 
 
 -- !query
@@ -657,7 +670,21 @@ select date_part(c, c) from t
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-The 'field' parameter of function 'date_part' needs to be a string literal.; 
line 1 pos 7
+{
+  "errorClass" : "_LEGACY_ERROR_TEMP_1100",
+  "messageParameters" : {
+    "argName" : "field",
+    "funcName" : "date_part",
+    "requiredType" : "string"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 8,
+    "stopIndex" : 22,
+    "fragment" : "date_part(c, c)"
+  } ]
+}
 
 
 -- !query
@@ -674,7 +701,21 @@ select date_part(i, i) from t
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-The 'field' parameter of function 'date_part' needs to be a string literal.; 
line 1 pos 7
+{
+  "errorClass" : "_LEGACY_ERROR_TEMP_1100",
+  "messageParameters" : {
+    "argName" : "field",
+    "funcName" : "date_part",
+    "requiredType" : "string"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 8,
+    "stopIndex" : 22,
+    "fragment" : "date_part(i, i)"
+  } ]
+}
 
 
 -- !query
@@ -892,7 +933,20 @@ select date_part('DAY', interval '2-1' YEAR TO MONTH)
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-Literals of type 'DAY' are currently not supported for the interval year to 
month type.; line 1 pos 7
+{
+  "errorClass" : "_LEGACY_ERROR_TEMP_1102",
+  "messageParameters" : {
+    "field" : "DAY",
+    "srcDataType" : "interval year to month"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 8,
+    "stopIndex" : 53,
+    "fragment" : "date_part('DAY', interval '2-1' YEAR TO MONTH)"
+  } ]
+}
 
 
 -- !query
@@ -901,7 +955,20 @@ select date_part('not_supported', interval '2-1' YEAR TO 
MONTH)
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-Literals of type 'not_supported' are currently not supported for the interval 
year to month type.; line 1 pos 7
+{
+  "errorClass" : "_LEGACY_ERROR_TEMP_1102",
+  "messageParameters" : {
+    "field" : "not_supported",
+    "srcDataType" : "interval year to month"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 8,
+    "stopIndex" : 63,
+    "fragment" : "date_part('not_supported', interval '2-1' YEAR TO MONTH)"
+  } ]
+}
 
 
 -- !query
@@ -1023,7 +1090,20 @@ select date_part('not_supported', interval '123 
12:34:56.789123123' DAY TO SECON
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-Literals of type 'not_supported' are currently not supported for the interval 
day to second type.; line 1 pos 7
+{
+  "errorClass" : "_LEGACY_ERROR_TEMP_1102",
+  "messageParameters" : {
+    "field" : "not_supported",
+    "srcDataType" : "interval day to second"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 8,
+    "stopIndex" : 82,
+    "fragment" : "date_part('not_supported', interval '123 12:34:56.789123123' 
DAY TO SECOND)"
+  } ]
+}
 
 
 -- !query
diff --git 
a/sql/core/src/test/resources/sql-tests/results/postgreSQL/int8.sql.out 
b/sql/core/src/test/resources/sql-tests/results/postgreSQL/int8.sql.out
index d1974a1c0d8..dd18f1d2e8c 100755
--- a/sql/core/src/test/resources/sql-tests/results/postgreSQL/int8.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/postgreSQL/int8.sql.out
@@ -852,12 +852,22 @@ SELECT * FROM range(bigint('+4567890123456789'), 
bigint('+4567890123456799'), 0)
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-Table-valued function range with alternatives: 
-    range(start: long, end: long, step: long, numSlices: integer)
-    range(start: long, end: long, step: long)
-    range(start: long, end: long)
-    range(end: long)
-cannot be applied to (long, long, integer): requirement failed: step (0) 
cannot be 0; line 1 pos 14
+{
+  "errorClass" : "_LEGACY_ERROR_TEMP_1179",
+  "messageParameters" : {
+    "arguments" : "long, long, integer",
+    "details" : "requirement failed: step (0) cannot be 0",
+    "name" : "range",
+    "usage" : "\n    range(start: long, end: long, step: long, numSlices: 
integer)\n    range(start: long, end: long, step: long)\n    range(start: long, 
end: long)\n    range(end: long)"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 15,
+    "stopIndex" : 80,
+    "fragment" : "range(bigint('+4567890123456789'), 
bigint('+4567890123456799'), 0)"
+  } ]
+}
 
 
 -- !query
diff --git 
a/sql/core/src/test/resources/sql-tests/results/postgreSQL/numeric.sql.out 
b/sql/core/src/test/resources/sql-tests/results/postgreSQL/numeric.sql.out
index c4c24d5ed86..18226b0fd03 100644
--- a/sql/core/src/test/resources/sql-tests/results/postgreSQL/numeric.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/postgreSQL/numeric.sql.out
@@ -3827,7 +3827,15 @@ INSERT INTO num_result SELECT t1.id, t2.id, t1.val, 
t2.val, t1.val * t2.val
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-`spark_catalog`.`default`.`num_result` requires that the data to be inserted 
have the same number of columns as the target table: target table has 3 
column(s) but the inserted data has 5 column(s), including 0 partition 
column(s) having constant value(s).
+{
+  "errorClass" : "_LEGACY_ERROR_TEMP_1168",
+  "messageParameters" : {
+    "insertedColumns" : "5",
+    "staticPartCols" : "0",
+    "tableName" : "`spark_catalog`.`default`.`num_result`",
+    "targetColumns" : "3"
+  }
+}
 
 
 -- !query
diff --git a/sql/core/src/test/resources/sql-tests/results/show-tables.sql.out 
b/sql/core/src/test/resources/sql-tests/results/show-tables.sql.out
index 181d5854bad..f095f98fd84 100644
--- a/sql/core/src/test/resources/sql-tests/results/show-tables.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/show-tables.sql.out
@@ -209,7 +209,12 @@ SHOW TABLE EXTENDED LIKE 'show_t*' PARTITION(c='Us', d=1)
 struct<>
 -- !query output
 org.apache.spark.sql.catalyst.analysis.NoSuchTableException
-Table or view 'show_t*' not found in database 'showdb'
+{
+  "errorClass" : "_LEGACY_ERROR_TEMP_1115",
+  "messageParameters" : {
+    "msg" : "Table or view 'show_t*' not found in database 'showdb'"
+  }
+}
 
 
 -- !query
diff --git 
a/sql/core/src/test/resources/sql-tests/results/string-functions.sql.out 
b/sql/core/src/test/resources/sql-tests/results/string-functions.sql.out
index 70dc4362517..493fb3c34fc 100644
--- a/sql/core/src/test/resources/sql-tests/results/string-functions.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/string-functions.sql.out
@@ -1471,4 +1471,12 @@ select to_binary('abc', 'invalidFormat')
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-Invalid value for the 'format' parameter of function 'to_binary': 
invalidformat. The value has to be a case-insensitive string literal of 'hex', 
'utf-8', 'utf8', or 'base64'.
+{
+  "errorClass" : "_LEGACY_ERROR_TEMP_1101",
+  "messageParameters" : {
+    "argName" : "format",
+    "endingMsg" : " The value has to be a case-insensitive string literal of 
'hex', 'utf-8', 'utf8', or 'base64'.",
+    "funcName" : "to_binary",
+    "invalidValue" : "invalidformat"
+  }
+}
diff --git 
a/sql/core/src/test/resources/sql-tests/results/table-valued-functions.sql.out 
b/sql/core/src/test/resources/sql-tests/results/table-valued-functions.sql.out
index c9e632f2dc5..95726367bee 100644
--- 
a/sql/core/src/test/resources/sql-tests/results/table-valued-functions.sql.out
+++ 
b/sql/core/src/test/resources/sql-tests/results/table-valued-functions.sql.out
@@ -67,12 +67,22 @@ select * from range(1, 1, 1, 1, 1)
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-Table-valued function range with alternatives: 
-    range(start: long, end: long, step: long, numSlices: integer)
-    range(start: long, end: long, step: long)
-    range(start: long, end: long)
-    range(end: long)
-cannot be applied to (integer, integer, integer, integer, integer): Invalid 
number of arguments for function range. Expected: one of 1, 2, 3 and 4; Found: 
5.; line 1 pos 14
+{
+  "errorClass" : "_LEGACY_ERROR_TEMP_1179",
+  "messageParameters" : {
+    "arguments" : "integer, integer, integer, integer, integer",
+    "details" : "Invalid number of arguments for function range. Expected: one 
of 1, 2, 3 and 4; Found: 5.",
+    "name" : "range",
+    "usage" : "\n    range(start: long, end: long, step: long, numSlices: 
integer)\n    range(start: long, end: long, step: long)\n    range(start: long, 
end: long)\n    range(end: long)"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 15,
+    "stopIndex" : 34,
+    "fragment" : "range(1, 1, 1, 1, 1)"
+  } ]
+}
 
 
 -- !query
@@ -81,12 +91,22 @@ select * from range(1, null)
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-Table-valued function range with alternatives: 
-    range(start: long, end: long, step: long, numSlices: integer)
-    range(start: long, end: long, step: long)
-    range(start: long, end: long)
-    range(end: long)
-cannot be applied to (integer, void): Incompatible input data type. Expected: 
long; Found: void; line 1 pos 14
+{
+  "errorClass" : "_LEGACY_ERROR_TEMP_1179",
+  "messageParameters" : {
+    "arguments" : "integer, void",
+    "details" : "Incompatible input data type.\nExpected: long; Found: void.",
+    "name" : "range",
+    "usage" : "\n    range(start: long, end: long, step: long, numSlices: 
integer)\n    range(start: long, end: long, step: long)\n    range(start: long, 
end: long)\n    range(end: long)"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 15,
+    "stopIndex" : 28,
+    "fragment" : "range(1, null)"
+  } ]
+}
 
 
 -- !query
@@ -95,12 +115,22 @@ select * from range(array(1, 2, 3))
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-Table-valued function range with alternatives: 
-    range(start: long, end: long, step: long, numSlices: integer)
-    range(start: long, end: long, step: long)
-    range(start: long, end: long)
-    range(end: long)
-cannot be applied to (array): Incompatible input data type. Expected: long; 
Found: array; line 1 pos 14
+{
+  "errorClass" : "_LEGACY_ERROR_TEMP_1179",
+  "messageParameters" : {
+    "arguments" : "array",
+    "details" : "Incompatible input data type.\nExpected: long; Found: array.",
+    "name" : "range",
+    "usage" : "\n    range(start: long, end: long, step: long, numSlices: 
integer)\n    range(start: long, end: long, step: long)\n    range(start: long, 
end: long)\n    range(end: long)"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 15,
+    "stopIndex" : 35,
+    "fragment" : "range(array(1, 2, 3))"
+  } ]
+}
 
 
 -- !query
@@ -109,12 +139,22 @@ select * from range(0, 5, 0)
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-Table-valued function range with alternatives: 
-    range(start: long, end: long, step: long, numSlices: integer)
-    range(start: long, end: long, step: long)
-    range(start: long, end: long)
-    range(end: long)
-cannot be applied to (integer, integer, integer): requirement failed: step (0) 
cannot be 0; line 1 pos 14
+{
+  "errorClass" : "_LEGACY_ERROR_TEMP_1179",
+  "messageParameters" : {
+    "arguments" : "integer, integer, integer",
+    "details" : "requirement failed: step (0) cannot be 0",
+    "name" : "range",
+    "usage" : "\n    range(start: long, end: long, step: long, numSlices: 
integer)\n    range(start: long, end: long, step: long)\n    range(start: long, 
end: long)\n    range(end: long)"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 15,
+    "stopIndex" : 28,
+    "fragment" : "range(0, 5, 0)"
+  } ]
+}
 
 
 -- !query
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
index 1a3e49186da..f842b873dd2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
@@ -184,10 +184,15 @@ class JDBCWriteSuite extends SharedSparkSession with 
BeforeAndAfter {
     df.write.jdbc(url, "TEST.APPENDTEST", new Properties())
 
     withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
-      val m = intercept[AnalysisException] {
-        df2.write.mode(SaveMode.Append).jdbc(url, "TEST.APPENDTEST", new 
Properties())
-      }.getMessage
-      assert(m.contains("Column \"NAME\" not found"))
+      checkError(
+        exception = intercept[AnalysisException] {
+          df2.write.mode(SaveMode.Append).jdbc(url, "TEST.APPENDTEST", new 
Properties())
+        },
+        errorClass = "_LEGACY_ERROR_TEMP_1156",
+        parameters = Map(
+          "colName" -> "NAME",
+          "tableSchema" ->
+            
"Some(StructType(StructField(name,StringType,true),StructField(id,IntegerType,true)))"))
     }
 
     withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
@@ -211,12 +216,16 @@ class JDBCWriteSuite extends SharedSparkSession with 
BeforeAndAfter {
       assert(1 === spark.read.jdbc(url1, "TEST.TRUNCATETEST", 
properties).count())
       assert(2 === spark.read.jdbc(url1, "TEST.TRUNCATETEST", 
properties).collect()(0).length)
 
-      val m = intercept[AnalysisException] {
-        df3.write.mode(SaveMode.Overwrite).option("truncate", true)
-          .jdbc(url1, "TEST.TRUNCATETEST", properties)
-      }.getMessage
-      assert(m.contains("Column \"seq\" not found"))
-      assert(0 === spark.read.jdbc(url1, "TEST.TRUNCATETEST", 
properties).count())
+      checkError(
+        exception = intercept[AnalysisException] {
+          df3.write.mode(SaveMode.Overwrite).option("truncate", true)
+            .jdbc(url1, "TEST.TRUNCATETEST", properties)
+        },
+        errorClass = "_LEGACY_ERROR_TEMP_1156",
+        parameters = Map(
+          "colName" -> "seq",
+          "tableSchema" ->
+            
"Some(StructType(StructField(name,StringType,true),StructField(id,IntegerType,true)))"))
     } finally {
       JdbcDialects.unregisterDialect(testH2Dialect)
       JdbcDialects.registerDialect(H2Dialect)
@@ -240,10 +249,15 @@ class JDBCWriteSuite extends SharedSparkSession with 
BeforeAndAfter {
     val df2 = spark.createDataFrame(sparkContext.parallelize(arr2x3), schema3)
 
     df.write.jdbc(url, "TEST.INCOMPATIBLETEST", new Properties())
-    val m = intercept[AnalysisException] {
-      df2.write.mode(SaveMode.Append).jdbc(url, "TEST.INCOMPATIBLETEST", new 
Properties())
-    }.getMessage
-    assert(m.contains("Column \"seq\" not found"))
+    checkError(
+      exception = intercept[AnalysisException] {
+        df2.write.mode(SaveMode.Append).jdbc(url, "TEST.INCOMPATIBLETEST", new 
Properties())
+      },
+      errorClass = "_LEGACY_ERROR_TEMP_1156",
+      parameters = Map(
+        "colName" -> "seq",
+        "tableSchema" ->
+          
"Some(StructType(StructField(name,StringType,true),StructField(id,IntegerType,true)))"))
   }
 
   test("INSERT to JDBC Datasource") {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala
index ae35f29c764..d68b8a3e4a0 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala
@@ -238,19 +238,23 @@ abstract class BucketedWriteSuite extends QueryTest with 
SQLTestUtils {
   }
 
   test("write bucketed data with the overlapping bucketBy/sortBy and 
partitionBy columns") {
-    val e1 = intercept[AnalysisException](df.write
-      .partitionBy("i", "j")
-      .bucketBy(8, "j", "k")
-      .sortBy("k")
-      .saveAsTable("bucketed_table"))
-    assert(e1.message.contains("bucketing column 'j' should not be part of 
partition columns"))
-
-    val e2 = intercept[AnalysisException](df.write
-      .partitionBy("i", "j")
-      .bucketBy(8, "k")
-      .sortBy("i")
-      .saveAsTable("bucketed_table"))
-    assert(e2.message.contains("bucket sorting column 'i' should not be part 
of partition columns"))
+    checkError(
+      exception = intercept[AnalysisException](df.write
+        .partitionBy("i", "j")
+        .bucketBy(8, "j", "k")
+        .sortBy("k")
+        .saveAsTable("bucketed_table")),
+      errorClass = "_LEGACY_ERROR_TEMP_1166",
+      parameters = Map("bucketCol" -> "j", "normalizedPartCols" -> "i, j"))
+
+    checkError(
+      exception = intercept[AnalysisException](df.write
+        .partitionBy("i", "j")
+        .bucketBy(8, "k")
+        .sortBy("i")
+        .saveAsTable("bucketed_table")),
+      errorClass = "_LEGACY_ERROR_TEMP_1167",
+      parameters = Map("sortCol" -> "i", "normalizedPartCols" -> "i, j"))
   }
 
   test("write bucketed data without partitionBy") {
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index ffd0e6b48af..7b2e772fa49 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -1194,22 +1194,29 @@ class MetastoreDataSourcesSuite extends QueryTest with 
SQLTestUtils with TestHiv
   test("saveAsTable[append]: mismatch column names") {
     withTable("saveAsTable_mismatch_column_names") {
       Seq((1, 2)).toDF("i", 
"j").write.saveAsTable("saveAsTable_mismatch_column_names")
-      val e = intercept[AnalysisException] {
-        Seq((3, 4)).toDF("i", "k")
-          
.write.mode("append").saveAsTable("saveAsTable_mismatch_column_names")
-      }
-      assert(e.getMessage.contains("cannot resolve"))
+      checkError(
+        exception = intercept[AnalysisException] {
+          Seq((3, 4)).toDF("i", "k")
+            
.write.mode("append").saveAsTable("saveAsTable_mismatch_column_names")
+        },
+        errorClass = "_LEGACY_ERROR_TEMP_1162",
+        parameters = Map("col" -> "j", "inputColumns" -> "i, k"))
     }
   }
 
   test("saveAsTable[append]: too many columns") {
     withTable("saveAsTable_too_many_columns") {
       Seq((1, 2)).toDF("i", 
"j").write.saveAsTable("saveAsTable_too_many_columns")
-      val e = intercept[AnalysisException] {
-        Seq((3, 4, 5)).toDF("i", "j", "k")
-          .write.mode("append").saveAsTable("saveAsTable_too_many_columns")
-      }
-      assert(e.getMessage.contains("doesn't match"))
+      checkError(
+        exception = intercept[AnalysisException] {
+          Seq((3, 4, 5)).toDF("i", "j", "k")
+            .write.mode("append").saveAsTable("saveAsTable_too_many_columns")
+        },
+        errorClass = "_LEGACY_ERROR_TEMP_1161",
+        parameters = Map(
+          "tableName" -> "spark_catalog.default.saveastable_too_many_columns",
+          "existingTableSchema" -> "struct<i:int,j:int>",
+          "querySchema" -> "struct<i:int,j:int,k:int>"))
     }
   }
 
@@ -1285,11 +1292,15 @@ class MetastoreDataSourcesSuite extends QueryTest with 
SQLTestUtils with TestHiv
   test("saveAsTable[append]: less columns") {
     withTable("saveAsTable_less_columns") {
       Seq((1, 2)).toDF("i", "j").write.saveAsTable("saveAsTable_less_columns")
-      val e = intercept[AnalysisException] {
-        Seq((4)).toDF("j")
-          .write.mode("append").saveAsTable("saveAsTable_less_columns")
-      }
-      assert(e.getMessage.contains("doesn't match"))
+      checkError(
+        exception = intercept[AnalysisException] {
+          
Seq(4).toDF("j").write.mode("append").saveAsTable("saveAsTable_less_columns")
+        },
+        errorClass = "_LEGACY_ERROR_TEMP_1161",
+        parameters = Map(
+          "tableName" -> "spark_catalog.default.saveastable_less_columns",
+          "existingTableSchema" -> "struct<i:int,j:int>",
+          "querySchema" -> "struct<j:int>"))
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org


Reply via email to