This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 640ed4fad32f [SPARK-47322][PYTHON][CONNECT] Make `withColumnsRenamed` 
column names duplication handling consistent with `withColumnRenamed`
640ed4fad32f is described below

commit 640ed4fad32f1042d564166ddca2609190fb6a96
Author: Ruifeng Zheng <ruife...@apache.org>
AuthorDate: Fri Mar 8 20:52:22 2024 +0900

    [SPARK-47322][PYTHON][CONNECT] Make `withColumnsRenamed` column names 
duplication handling consistent with `withColumnRenamed`
    
    ### What changes were proposed in this pull request?
    Make `withColumnsRenamed` duplicated column name handling consistent with 
`withColumnRenamed`
    
    ### Why are the changes needed?
    `withColumnsRenamed` checks the column names duplication of output 
dataframe, this is not consistent with `withColumnRenamed`:
    1, `withColumnRenamed` doesn't do this check, and support output a 
dataframe with duplicated column names;
    2, when the input dataframe has duplicated column names, 
`withColumnsRenamed` always fail, even if the columns with the same name are 
not touched at all:
    
    ```
    In [8]: df1 = spark.createDataFrame([(1, "id2"),], ["id", "value"])
       ...: df2 = spark.createDataFrame([(1, 'x', 'id1'), ], ["id", 'a', 
"value"])
       ...: join = df2.join(df1, on=['id'], how='left')
       ...: join
    Out[8]: DataFrame[id: bigint, a: string, value: string, value: string]
    
    In [9]: join.withColumnRenamed('id', 'value')
    Out[9]: DataFrame[value: bigint, a: string, value: string, value: string]
    
    In [10]: join.withColumnsRenamed({'id' : 'value'})
    ...
    AnalysisException: [COLUMN_ALREADY_EXISTS] The column `value` already 
exists. Choose another name or rename the existing column. SQLSTATE: 42711
    
    In [11]: join.withColumnRenamed('a', 'b')
    Out[11]: DataFrame[id: bigint, b: string, value: string, value: string]
    
    In [12]: join.withColumnsRenamed({'a' : 'b'})
    ...
    AnalysisException: [COLUMN_ALREADY_EXISTS] The column `value` already 
exists. Choose another name or rename the existing column. SQLSTATE: 42711
    
    In [13]: join.withColumnRenamed('x', 'y')
    Out[13]: DataFrame[id: bigint, a: string, value: string, value: string]
    
    In [14]: join.withColumnsRenamed({'x' : 'y'})
    AnalysisException: [COLUMN_ALREADY_EXISTS] The column `value` already 
exists. Choose another name or rename the existing column. SQLSTATE: 42711
    
    In [15]: join.withColumnRenamed('value', 'new_value')
    Out[15]: DataFrame[id: bigint, a: string, new_value: string, new_value: 
string]
    
    In [16]: join.withColumnsRenamed({'value' : 'new_value'})
    AnalysisException: [COLUMN_ALREADY_EXISTS] The column `new_value` already 
exists. Choose another name or rename the existing column. SQLSTATE: 42711
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    yes
    
    ### How was this patch tested?
    updated tests
    
    ### Was this patch authored or co-authored using generative AI tooling?
    no
    
    Closes #45431 from zhengruifeng/connect_renames.
    
    Authored-by: Ruifeng Zheng <ruife...@apache.org>
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
---
 .../connect/planner/SparkConnectProtoSuite.scala   |  9 -----
 python/pyspark/sql/tests/test_dataframe.py         | 22 ++++++++++++
 .../main/scala/org/apache/spark/sql/Dataset.scala  |  3 --
 .../org/apache/spark/sql/DataFrameSuite.scala      | 39 ----------------------
 4 files changed, 22 insertions(+), 51 deletions(-)

diff --git 
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala
 
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala
index 1b50936d935a..b989f5027cf9 100644
--- 
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala
+++ 
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala
@@ -566,15 +566,6 @@ class SparkConnectProtoSuite extends PlanTest with 
SparkConnectPlanTest {
     comparePlans(
       connectTestRelation.withColumnsRenamed(Map("id" -> "id1", "id" -> 
"id2")),
       sparkTestRelation.withColumnsRenamed(Map("id" -> "id1", "id" -> "id2")))
-
-    checkError(
-      exception = intercept[AnalysisException] {
-        transform(
-          connectTestRelation.withColumnsRenamed(
-            Map("id" -> "duplicatedCol", "name" -> "duplicatedCol")))
-      },
-      errorClass = "COLUMN_ALREADY_EXISTS",
-      parameters = Map("columnName" -> "`duplicatedcol`"))
   }
 
   test("Writes fails without path or table") {
diff --git a/python/pyspark/sql/tests/test_dataframe.py 
b/python/pyspark/sql/tests/test_dataframe.py
index ad7a3b8dfc96..38310f12a4a7 100644
--- a/python/pyspark/sql/tests/test_dataframe.py
+++ b/python/pyspark/sql/tests/test_dataframe.py
@@ -231,6 +231,28 @@ class DataFrameTestsMixin:
             message_parameters={"arg_name": "colsMap", "arg_type": "tuple"},
         )
 
+    def test_with_columns_renamed_with_duplicated_names(self):
+        df1 = self.spark.createDataFrame([(1, "v1")], ["id", "value"])
+        df2 = self.spark.createDataFrame([(1, "x", "v2")], ["id", "a", 
"value"])
+        join = df2.join(df1, on=["id"], how="left")
+
+        self.assertEqual(
+            join.withColumnRenamed("id", "value").columns,
+            join.withColumnsRenamed({"id": "value"}).columns,
+        )
+        self.assertEqual(
+            join.withColumnRenamed("a", "b").columns,
+            join.withColumnsRenamed({"a": "b"}).columns,
+        )
+        self.assertEqual(
+            join.withColumnRenamed("value", "new_value").columns,
+            join.withColumnsRenamed({"value": "new_value"}).columns,
+        )
+        self.assertEqual(
+            join.withColumnRenamed("x", "y").columns,
+            join.withColumnsRenamed({"x": "y"}).columns,
+        )
+
     def test_ordering_of_with_columns_renamed(self):
         df = self.spark.range(10)
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 189be1d6a30d..f3bf6119659d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -2932,9 +2932,6 @@ class Dataset[T] private[sql](
           }
         )
     }
-    SchemaUtils.checkColumnNameDuplication(
-      projectList.map(_.name),
-      sparkSession.sessionState.conf.caseSensitiveAnalysis)
     withPlan(Project(projectList, logicalPlan))
   }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index 1dc367fa6bf6..6b34a6412cc0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -789,45 +789,6 @@ class DataFrameSuite extends QueryTest
       assert(df.columns === Array("key", "value", "renamed1", "renamed2"))
   }
 
-  test("SPARK-40311: withColumnsRenamed case sensitive") {
-    withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
-      val df = testData.toDF().withColumns(Seq("newCol1", "newCOL2"),
-        Seq(col("key") + 1, col("key") + 2))
-        .withColumnsRenamed(Map("newCol1" -> "renamed1", "newCol2" -> 
"renamed2"))
-      checkAnswer(
-        df,
-        testData.collect().map { case Row(key: Int, value: String) =>
-          Row(key, value, key + 1, key + 2)
-        }.toSeq)
-      assert(df.columns === Array("key", "value", "renamed1", "newCOL2"))
-    }
-  }
-
-  test("SPARK-40311: withColumnsRenamed duplicate column names simple") {
-    checkError(
-      exception = intercept[AnalysisException] {
-        person.withColumnsRenamed(Map("id" -> "renamed", "name" -> "renamed"))
-      },
-      errorClass = "COLUMN_ALREADY_EXISTS",
-      parameters = Map("columnName" -> "`renamed`"))
-  }
-
-  test("SPARK-40311: withColumnsRenamed duplicate column names simple case 
sensitive") {
-    withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
-      val df = person.withColumnsRenamed(Map("id" -> "renamed", "name" -> 
"Renamed"))
-      assert(df.columns === Array("renamed", "Renamed", "age"))
-    }
-  }
-
-  test("SPARK-40311: withColumnsRenamed duplicate column names indirect") {
-    checkError(
-      exception = intercept[AnalysisException] {
-        person.withColumnsRenamed(Map("id" -> "renamed1", "renamed1" -> "age"))
-      },
-      errorClass = "COLUMN_ALREADY_EXISTS",
-      parameters = Map("columnName" -> "`age`"))
-  }
-
   test("SPARK-46260: withColumnsRenamed should respect the Map ordering") {
     val df = spark.range(10).toDF()
     assert(df.withColumnsRenamed(ListMap("id" -> "a", "a" -> "b")).columns === 
Array("b"))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to