This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.2 by this push: new 882ef6d [SPARK-36449][SQL] v2 ALTER TABLE REPLACE COLUMNS should check duplicates for the user specified columns 882ef6d is described below commit 882ef6dd73baefcd44ddc282a8db455477d41902 Author: Terry Kim <yumin...@gmail.com> AuthorDate: Tue Aug 10 13:20:29 2021 +0800 [SPARK-36449][SQL] v2 ALTER TABLE REPLACE COLUMNS should check duplicates for the user specified columns ### What changes were proposed in this pull request? Currently, v2 ALTER TABLE REPLACE COLUMNS does not check duplicates for the user specified columns. For example, ``` spark.sql(s"CREATE TABLE $t (id int) USING $v2Format") spark.sql(s"ALTER TABLE $t REPLACE COLUMNS (data string, data string)") ``` doesn't fail the analysis, and it's up to the catalog implementation to handle it. ### Why are the changes needed? To check the duplicate columns during analysis. ### Does this PR introduce _any_ user-facing change? Yes, now the above will command will print out the following: ``` org.apache.spark.sql.AnalysisException: Found duplicate column(s) in the user specified columns: `data` ``` ### How was this patch tested? Added new unit tests Closes #33676 from imback82/replace_cols_duplicates. Authored-by: Terry Kim <yumin...@gmail.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> (cherry picked from commit e1a5d9411733437b5a18045bbd18b48f7aa40f46) Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../sql/catalyst/analysis/CheckAnalysis.scala | 25 ++++++++++++++-------- .../spark/sql/connector/AlterTableTests.scala | 11 ++++++++++ .../connector/V2CommandsCaseSensitivitySuite.scala | 14 ++++++++++-- 3 files changed, 39 insertions(+), 11 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index 043bf95..293e8c6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -940,26 +940,33 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog { * Validates the options used for alter table commands after table and columns are resolved. */ private def checkAlterTableCommand(alter: AlterTableCommand): Unit = { - def checkColumnNotExists( - op: String, fieldNames: Seq[String], struct: StructType, r: Resolver): Unit = { - if (struct.findNestedField(fieldNames, includeCollections = true, r).isDefined) { + def checkColumnNotExists(op: String, fieldNames: Seq[String], struct: StructType): Unit = { + if (struct.findNestedField( + fieldNames, includeCollections = true, alter.conf.resolver).isDefined) { alter.failAnalysis(s"Cannot $op column, because ${fieldNames.quoted} " + s"already exists in ${struct.treeString}") } } + def checkColumnNameDuplication(colsToAdd: Seq[QualifiedColType]): Unit = { + SchemaUtils.checkColumnNameDuplication( + colsToAdd.map(_.name.quoted), + "in the user specified columns", + alter.conf.resolver) + } + alter match { case AddColumns(table: ResolvedTable, colsToAdd) => colsToAdd.foreach { colToAdd => - checkColumnNotExists("add", colToAdd.name, table.schema, alter.conf.resolver) + checkColumnNotExists("add", colToAdd.name, table.schema) } - SchemaUtils.checkColumnNameDuplication( - colsToAdd.map(_.name.quoted), - "in the user specified columns", - alter.conf.resolver) + checkColumnNameDuplication(colsToAdd) + + case ReplaceColumns(_: ResolvedTable, colsToAdd) => + checkColumnNameDuplication(colsToAdd) case RenameColumn(table: ResolvedTable, col: ResolvedFieldName, newName) => - checkColumnNotExists("rename", col.path :+ newName, table.schema, alter.conf.resolver) + checkColumnNotExists("rename", col.path :+ newName, table.schema) case a @ AlterColumn(table: ResolvedTable, col: ResolvedFieldName, _, _, _, _) => val fieldName = col.name.quoted diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala index 1bd45f5..1b0898f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala @@ -1175,4 +1175,15 @@ trait AlterTableTests extends SharedSparkSession { StructField("col3", IntegerType).withComment("c3")))) } } + + test("SPARK-36449: Replacing columns with duplicate name should not be allowed") { + val t = s"${catalogAndNamespace}table_name" + withTable(t) { + sql(s"CREATE TABLE $t (data string) USING $v2Format") + val e = intercept[AnalysisException] { + sql(s"ALTER TABLE $t REPLACE COLUMNS (data string, data1 string, data string)") + } + assert(e.message.contains("Found duplicate column(s) in the user specified columns: `data`")) + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala index 0cc8d05..f262cf1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.connector import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, CreateTablePartitioningValidationSuite, ResolvedTable, TestRelation2, TestTable2, UnresolvedFieldName, UnresolvedFieldPosition} -import org.apache.spark.sql.catalyst.plans.logical.{AddColumns, AlterColumn, AlterTableCommand, CreateTableAsSelect, DropColumns, LogicalPlan, QualifiedColType, RenameColumn, ReplaceTableAsSelect} +import org.apache.spark.sql.catalyst.plans.logical.{AddColumns, AlterColumn, AlterTableCommand, CreateTableAsSelect, DropColumns, LogicalPlan, QualifiedColType, RenameColumn, ReplaceColumns, ReplaceTableAsSelect} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.connector.catalog.Identifier import org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition @@ -316,8 +316,18 @@ class V2CommandsCaseSensitivitySuite extends SharedSparkSession with AnalysisTes } } + test("SPARK-36449: Replacing columns with duplicate name should not be allowed") { + alterTableTest( + ReplaceColumns( + table, + Seq(QualifiedColType(None, "f", LongType, true, None, None), + QualifiedColType(None, "F", LongType, true, None, None))), + Seq("Found duplicate column(s) in the user specified columns: `f`"), + expectErrorOnCaseSensitive = false) + } + private def alterTableTest( - alter: AlterTableCommand, + alter: => AlterTableCommand, error: Seq[String], expectErrorOnCaseSensitive: Boolean = true): Unit = { Seq(true, false).foreach { caseSensitive => --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org