This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new 882ef6d  [SPARK-36449][SQL] v2 ALTER TABLE REPLACE COLUMNS should 
check duplicates for the user specified columns
882ef6d is described below

commit 882ef6dd73baefcd44ddc282a8db455477d41902
Author: Terry Kim <yumin...@gmail.com>
AuthorDate: Tue Aug 10 13:20:29 2021 +0800

    [SPARK-36449][SQL] v2 ALTER TABLE REPLACE COLUMNS should check duplicates 
for the user specified columns
    
    ### What changes were proposed in this pull request?
    
    Currently, v2 ALTER TABLE REPLACE COLUMNS does not check duplicates for the 
user specified columns. For example,
    ```
    spark.sql(s"CREATE TABLE $t (id int) USING $v2Format")
    spark.sql(s"ALTER TABLE $t REPLACE COLUMNS (data string, data string)")
    ```
    doesn't fail the analysis, and it's up to the catalog implementation to 
handle it.
    
    ### Why are the changes needed?
    
    To check the duplicate columns during analysis.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, now the above will command will print out the following:
    ```
    org.apache.spark.sql.AnalysisException: Found duplicate column(s) in the 
user specified columns: `data`
    ```
    
    ### How was this patch tested?
    
    Added new unit tests
    
    Closes #33676 from imback82/replace_cols_duplicates.
    
    Authored-by: Terry Kim <yumin...@gmail.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
    (cherry picked from commit e1a5d9411733437b5a18045bbd18b48f7aa40f46)
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../sql/catalyst/analysis/CheckAnalysis.scala      | 25 ++++++++++++++--------
 .../spark/sql/connector/AlterTableTests.scala      | 11 ++++++++++
 .../connector/V2CommandsCaseSensitivitySuite.scala | 14 ++++++++++--
 3 files changed, 39 insertions(+), 11 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index 043bf95..293e8c6 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -940,26 +940,33 @@ trait CheckAnalysis extends PredicateHelper with 
LookupCatalog {
    * Validates the options used for alter table commands after table and 
columns are resolved.
    */
   private def checkAlterTableCommand(alter: AlterTableCommand): Unit = {
-    def checkColumnNotExists(
-      op: String, fieldNames: Seq[String], struct: StructType, r: Resolver): 
Unit = {
-      if (struct.findNestedField(fieldNames, includeCollections = true, 
r).isDefined) {
+    def checkColumnNotExists(op: String, fieldNames: Seq[String], struct: 
StructType): Unit = {
+      if (struct.findNestedField(
+          fieldNames, includeCollections = true, 
alter.conf.resolver).isDefined) {
         alter.failAnalysis(s"Cannot $op column, because ${fieldNames.quoted} " 
+
           s"already exists in ${struct.treeString}")
       }
     }
 
+    def checkColumnNameDuplication(colsToAdd: Seq[QualifiedColType]): Unit = {
+      SchemaUtils.checkColumnNameDuplication(
+        colsToAdd.map(_.name.quoted),
+        "in the user specified columns",
+        alter.conf.resolver)
+    }
+
     alter match {
       case AddColumns(table: ResolvedTable, colsToAdd) =>
         colsToAdd.foreach { colToAdd =>
-          checkColumnNotExists("add", colToAdd.name, table.schema, 
alter.conf.resolver)
+          checkColumnNotExists("add", colToAdd.name, table.schema)
         }
-        SchemaUtils.checkColumnNameDuplication(
-          colsToAdd.map(_.name.quoted),
-          "in the user specified columns",
-          alter.conf.resolver)
+        checkColumnNameDuplication(colsToAdd)
+
+      case ReplaceColumns(_: ResolvedTable, colsToAdd) =>
+        checkColumnNameDuplication(colsToAdd)
 
       case RenameColumn(table: ResolvedTable, col: ResolvedFieldName, newName) 
=>
-        checkColumnNotExists("rename", col.path :+ newName, table.schema, 
alter.conf.resolver)
+        checkColumnNotExists("rename", col.path :+ newName, table.schema)
 
       case a @ AlterColumn(table: ResolvedTable, col: ResolvedFieldName, _, _, 
_, _) =>
         val fieldName = col.name.quoted
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala
index 1bd45f5..1b0898f 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala
@@ -1175,4 +1175,15 @@ trait AlterTableTests extends SharedSparkSession {
         StructField("col3", IntegerType).withComment("c3"))))
     }
   }
+
+  test("SPARK-36449: Replacing columns with duplicate name should not be 
allowed") {
+    val t = s"${catalogAndNamespace}table_name"
+    withTable(t) {
+      sql(s"CREATE TABLE $t (data string) USING $v2Format")
+      val e = intercept[AnalysisException] {
+        sql(s"ALTER TABLE $t REPLACE COLUMNS (data string, data1 string, data 
string)")
+      }
+      assert(e.message.contains("Found duplicate column(s) in the user 
specified columns: `data`"))
+    }
+  }
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala
index 0cc8d05..f262cf1 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.connector
 
 import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, 
CreateTablePartitioningValidationSuite, ResolvedTable, TestRelation2, 
TestTable2, UnresolvedFieldName, UnresolvedFieldPosition}
-import org.apache.spark.sql.catalyst.plans.logical.{AddColumns, AlterColumn, 
AlterTableCommand, CreateTableAsSelect, DropColumns, LogicalPlan, 
QualifiedColType, RenameColumn, ReplaceTableAsSelect}
+import org.apache.spark.sql.catalyst.plans.logical.{AddColumns, AlterColumn, 
AlterTableCommand, CreateTableAsSelect, DropColumns, LogicalPlan, 
QualifiedColType, RenameColumn, ReplaceColumns, ReplaceTableAsSelect}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.connector.catalog.Identifier
 import org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition
@@ -316,8 +316,18 @@ class V2CommandsCaseSensitivitySuite extends 
SharedSparkSession with AnalysisTes
     }
   }
 
+  test("SPARK-36449: Replacing columns with duplicate name should not be 
allowed") {
+    alterTableTest(
+      ReplaceColumns(
+        table,
+        Seq(QualifiedColType(None, "f", LongType, true, None, None),
+          QualifiedColType(None, "F", LongType, true, None, None))),
+      Seq("Found duplicate column(s) in the user specified columns: `f`"),
+      expectErrorOnCaseSensitive = false)
+  }
+
   private def alterTableTest(
-      alter: AlterTableCommand,
+      alter: => AlterTableCommand,
       error: Seq[String],
       expectErrorOnCaseSensitive: Boolean = true): Unit = {
     Seq(true, false).foreach { caseSensitive =>

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to