This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new ebd2b78f87fa [SPARK-46727][SQL] Port `classifyException()` in JDBC dialects on error classes ebd2b78f87fa is described below commit ebd2b78f87fa6086c41d5e6bcade5efeefac75d0 Author: Max Gekk <max.g...@gmail.com> AuthorDate: Tue Jan 16 17:47:09 2024 +0300 [SPARK-46727][SQL] Port `classifyException()` in JDBC dialects on error classes ### What changes were proposed in this pull request? In the PR, I propose to port the existing `classifyException()` method which accepts a description to new one w/ an error class added by https://github.com/apache/spark/pull/44358. The modified JDBC dialects are: DB2, H2, Oracle, MS SQL Server, MySQL and PostgreSQL. ### Why are the changes needed? The old method `classifyException()` which accepts a `description` only has been deprecated already by ... ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? By existing integration tests, and the modified test suite: ``` $ build/sbt "test:testOnly *JDBCV2Suite" ``` ### Was this patch authored or co-authored using generative AI tooling? No. Closes #44739 from MaxGekk/port-jdbc-classifyException. Authored-by: Max Gekk <max.g...@gmail.com> Signed-off-by: Max Gekk <max.g...@gmail.com> --- .../org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala | 4 +- .../org/apache/spark/sql/jdbc/DB2Dialect.scala | 17 +++++-- .../org/apache/spark/sql/jdbc/H2Dialect.scala | 34 ++++++------- .../apache/spark/sql/jdbc/MsSqlServerDialect.scala | 17 +++++-- .../org/apache/spark/sql/jdbc/MySQLDialect.scala | 29 ++++++----- .../apache/spark/sql/jdbc/PostgresDialect.scala | 56 ++++++++++++---------- .../org/apache/spark/sql/jdbc/JDBCV2Suite.scala | 6 +-- 7 files changed, 91 insertions(+), 72 deletions(-) diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala index d1d247967b4b..bae274788212 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala @@ -279,7 +279,7 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu sql(s"CREATE index i1 ON $catalogName.new_table (col1)") }, errorClass = "INDEX_ALREADY_EXISTS", - parameters = Map("indexName" -> "i1", "tableName" -> "new_table") + parameters = Map("indexName" -> "`i1`", "tableName" -> "`new_table`") ) sql(s"DROP index i1 ON $catalogName.new_table") @@ -304,7 +304,7 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu sql(s"DROP index i1 ON $catalogName.new_table") }, errorClass = "INDEX_NOT_FOUND", - parameters = Map("indexName" -> "i1", "tableName" -> "new_table") + parameters = Map("indexName" -> "`i1`", "tableName" -> "`new_table`") ) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala index d5a132c7dd48..f745e466ed9e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala @@ -144,16 +144,23 @@ private object DB2Dialect extends JdbcDialect { s"COMMENT ON SCHEMA ${quoteIdentifier(schema)} IS ''" } - override def classifyException(message: String, e: Throwable): AnalysisException = { + override def classifyException( + e: Throwable, + errorClass: String, + messageParameters: Map[String, String], + description: String): AnalysisException = { e match { case sqlException: SQLException => sqlException.getSQLState match { // https://www.ibm.com/docs/en/db2/11.5?topic=messages-sqlstate - case "42893" => throw NonEmptyNamespaceException( - namespace = Array.empty, details = message, cause = Some(e)) - case _ => super.classifyException(message, e) + case "42893" => + throw NonEmptyNamespaceException( + namespace = messageParameters.get("namespace").toArray, + details = sqlException.getMessage, + cause = Some(e)) + case _ => super.classifyException(e, errorClass, messageParameters, description) } - case _ => super.classifyException(message, e) + case _ => super.classifyException(e, errorClass, messageParameters, description) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala index ae3a3addf7bf..cd151f790adf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala @@ -28,8 +28,7 @@ import scala.util.control.NonFatal import org.apache.commons.lang3.StringUtils import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.analysis.{IndexAlreadyExistsException, NoSuchIndexException, NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException, UnresolvedAttribute} -import org.apache.spark.sql.catalyst.util.quoteNameParts +import org.apache.spark.sql.catalyst.analysis.{IndexAlreadyExistsException, NoSuchIndexException, NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException} import org.apache.spark.sql.connector.catalog.Identifier import org.apache.spark.sql.connector.catalog.functions.UnboundFunction import org.apache.spark.sql.connector.catalog.index.TableIndex @@ -195,7 +194,11 @@ private[sql] object H2Dialect extends JdbcDialect { (ident.namespace() :+ indexName).map(quoteIdentifier).mkString(".") } - override def classifyException(message: String, e: Throwable): AnalysisException = { + override def classifyException( + e: Throwable, + errorClass: String, + messageParameters: Map[String, String], + description: String): AnalysisException = { e match { case exception: SQLException => // Error codes are from https://www.h2database.com/javadoc/org/h2/api/ErrorCode.html @@ -206,15 +209,16 @@ private[sql] object H2Dialect extends JdbcDialect { val regex = """"((?:[^"\\]|\\[\\"ntbrf])+)"""".r val name = regex.findFirstMatchIn(e.getMessage).get.group(1) val quotedName = org.apache.spark.sql.catalyst.util.quoteIdentifier(name) - throw new TableAlreadyExistsException(errorClass = "TABLE_OR_VIEW_ALREADY_EXISTS", + throw new TableAlreadyExistsException( + errorClass = "TABLE_OR_VIEW_ALREADY_EXISTS", messageParameters = Map("relationName" -> quotedName), cause = Some(e)) // TABLE_OR_VIEW_NOT_FOUND_1 case 42102 => - val quotedName = quoteNameParts(UnresolvedAttribute.parseAttributeName(message)) + val relationName = messageParameters.getOrElse("tableName", "") throw new NoSuchTableException( errorClass = "TABLE_OR_VIEW_NOT_FOUND", - messageParameters = Map("relationName" -> quotedName), + messageParameters = Map("relationName" -> relationName), cause = Some(e)) // SCHEMA_NOT_FOUND_1 case 90079 => @@ -224,25 +228,21 @@ private[sql] object H2Dialect extends JdbcDialect { throw new NoSuchNamespaceException(errorClass = "SCHEMA_NOT_FOUND", messageParameters = Map("schemaName" -> quotedName)) // INDEX_ALREADY_EXISTS_1 - case 42111 => - // The message is: Failed to create index indexName in tableName - val regex = "(?s)Failed to create index (.*) in (.*)".r - val indexName = regex.findFirstMatchIn(message).get.group(1) - val tableName = regex.findFirstMatchIn(message).get.group(2) + case 42111 if errorClass == "FAILED_JDBC.CREATE_INDEX" => + val indexName = messageParameters("indexName") + val tableName = messageParameters("tableName") throw new IndexAlreadyExistsException( indexName = indexName, tableName = tableName, cause = Some(e)) // INDEX_NOT_FOUND_1 - case 42112 => - // The message is: Failed to drop index indexName in tableName - val regex = "(?s)Failed to drop index (.*) in (.*)".r - val indexName = regex.findFirstMatchIn(message).get.group(1) - val tableName = regex.findFirstMatchIn(message).get.group(2) + case 42112 if errorClass == "FAILED_JDBC.DROP_INDEX" => + val indexName = messageParameters("indexName") + val tableName = messageParameters("tableName") throw new NoSuchIndexException(indexName, tableName, cause = Some(e)) case _ => // do nothing } case _ => // do nothing } - super.classifyException(message, e) + super.classifyException(e, errorClass, messageParameters, description) } override def compileExpression(expr: Expression): Option[String] = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala index 9776cff3f7c8..aaee6be24e61 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala @@ -190,15 +190,22 @@ private object MsSqlServerDialect extends JdbcDialect { if (limit > 0) s"TOP ($limit)" else "" } - override def classifyException(message: String, e: Throwable): AnalysisException = { + override def classifyException( + e: Throwable, + errorClass: String, + messageParameters: Map[String, String], + description: String): AnalysisException = { e match { case sqlException: SQLException => sqlException.getErrorCode match { - case 3729 => throw NonEmptyNamespaceException( - namespace = Array.empty, details = message, cause = Some(e)) - case _ => super.classifyException(message, e) + case 3729 => + throw NonEmptyNamespaceException( + namespace = messageParameters.get("namespace").toArray, + details = sqlException.getMessage, + cause = Some(e)) + case _ => super.classifyException(e, errorClass, messageParameters, description) } - case _ => super.classifyException(message, e) + case _ => super.classifyException(e, errorClass, messageParameters, description) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala index dd74c93bc2e1..cbed1d1e6384 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala @@ -270,28 +270,27 @@ private case object MySQLDialect extends JdbcDialect with SQLConfHelper { indexMap.values.toArray } - override def classifyException(message: String, e: Throwable): AnalysisException = { + override def classifyException( + e: Throwable, + errorClass: String, + messageParameters: Map[String, String], + description: String): AnalysisException = { e match { case sqlException: SQLException => sqlException.getErrorCode match { // ER_DUP_KEYNAME - case 1061 => - // The message is: Failed to create index indexName in tableName - val regex = "(?s)Failed to create index (.*) in (.*)".r - val indexName = regex.findFirstMatchIn(message).get.group(1) - val tableName = regex.findFirstMatchIn(message).get.group(2) - throw new IndexAlreadyExistsException( - indexName = indexName, tableName = tableName, cause = Some(e)) - case 1091 => - // The message is: Failed to drop index indexName in tableName - val regex = "(?s)Failed to drop index (.*) in (.*)".r - val indexName = regex.findFirstMatchIn(message).get.group(1) - val tableName = regex.findFirstMatchIn(message).get.group(2) + case 1061 if errorClass == "FAILED_JDBC.CREATE_INDEX" => + val indexName = messageParameters("indexName") + val tableName = messageParameters("tableName") + throw new IndexAlreadyExistsException(indexName, tableName, cause = Some(e)) + case 1091 if errorClass == "FAILED_JDBC.DROP_INDEX" => + val indexName = messageParameters("indexName") + val tableName = messageParameters("tableName") throw new NoSuchIndexException(indexName, tableName, cause = Some(e)) - case _ => super.classifyException(message, e) + case _ => super.classifyException(e, errorClass, messageParameters, description) } case unsupported: UnsupportedOperationException => throw unsupported - case _ => super.classifyException(message, e) + case _ => super.classifyException(e, errorClass, messageParameters, description) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala index 901e66e5efcb..3eb065a5d4f0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala @@ -225,42 +225,48 @@ private object PostgresDialect extends JdbcDialect with SQLConfHelper { s"DROP INDEX ${quoteIdentifier(indexName)}" } - override def classifyException(message: String, e: Throwable): AnalysisException = { + // Message pattern defined by postgres specification + private final val pgAlreadyExistsRegex = """(?:.*)relation "(.*)" already exists""".r + + override def classifyException( + e: Throwable, + errorClass: String, + messageParameters: Map[String, String], + description: String): AnalysisException = { e match { case sqlException: SQLException => sqlException.getSQLState match { // https://www.postgresql.org/docs/14/errcodes-appendix.html case "42P07" => - // Message patterns defined at caller sides of spark - val indexRegex = "(?s)Failed to create index (.*) in (.*)".r - val renameRegex = "(?s)Failed table renaming from (.*) to (.*)".r - // Message pattern defined by postgres specification - val pgRegex = """(?:.*)relation "(.*)" already exists""".r - - message match { - case indexRegex(index, table) => - throw new IndexAlreadyExistsException( - indexName = index, tableName = table, cause = Some(e)) - case renameRegex(_, newTable) => - throw QueryCompilationErrors.tableAlreadyExistsError(newTable) - case _ if pgRegex.findFirstMatchIn(sqlException.getMessage).nonEmpty => - val tableName = pgRegex.findFirstMatchIn(sqlException.getMessage).get.group(1) - throw QueryCompilationErrors.tableAlreadyExistsError(tableName) - case _ => super.classifyException(message, e) + if (errorClass == "FAILED_JDBC.CREATE_INDEX") { + throw new IndexAlreadyExistsException( + indexName = messageParameters("indexName"), + tableName = messageParameters("tableName"), + cause = Some(e)) + } else if (errorClass == "FAILED_JDBC.RENAME_TABLE") { + val newTable = messageParameters("newName") + throw QueryCompilationErrors.tableAlreadyExistsError(newTable) + } else { + val tblRegexp = pgAlreadyExistsRegex.findFirstMatchIn(sqlException.getMessage) + if (tblRegexp.nonEmpty) { + throw QueryCompilationErrors.tableAlreadyExistsError(tblRegexp.get.group(1)) + } else { + super.classifyException(e, errorClass, messageParameters, description) + } } - case "42704" => - // The message is: Failed to drop index indexName in tableName - val regex = "(?s)Failed to drop index (.*) in (.*)".r - val indexName = regex.findFirstMatchIn(message).get.group(1) - val tableName = regex.findFirstMatchIn(message).get.group(2) + case "42704" if errorClass == "FAILED_JDBC.DROP_INDEX" => + val indexName = messageParameters("indexName") + val tableName = messageParameters("tableName") throw new NoSuchIndexException(indexName, tableName, cause = Some(e)) case "2BP01" => throw NonEmptyNamespaceException( - namespace = Array.empty, details = message, cause = Some(e)) - case _ => super.classifyException(message, e) + namespace = messageParameters.get("namespace").toArray, + details = sqlException.getMessage, + cause = Some(e)) + case _ => super.classifyException(e, errorClass, messageParameters, description) } case unsupported: UnsupportedOperationException => throw unsupported - case _ => super.classifyException(message, e) + case _ => super.classifyException(e, errorClass, messageParameters, description) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala index 05b3787d0ff2..a3990f3cfbb3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala @@ -2980,8 +2980,8 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel }, errorClass = "INDEX_ALREADY_EXISTS", parameters = Map( - "indexName" -> "people_index", - "tableName" -> "test.people" + "indexName" -> "`people_index`", + "tableName" -> "`test`.`people`" ) ) assert(jdbcTable.indexExists("people_index")) @@ -2997,7 +2997,7 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel sql(s"DROP INDEX people_index ON TABLE h2.test.people") }, errorClass = "INDEX_NOT_FOUND", - parameters = Map("indexName" -> "people_index", "tableName" -> "test.people") + parameters = Map("indexName" -> "`people_index`", "tableName" -> "`test`.`people`") ) assert(jdbcTable.indexExists("people_index") == false) val indexes3 = jdbcTable.listIndexes() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org