This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 640ed4fad32f [SPARK-47322][PYTHON][CONNECT] Make `withColumnsRenamed` column names duplication handling consistent with `withColumnRenamed` 640ed4fad32f is described below commit 640ed4fad32f1042d564166ddca2609190fb6a96 Author: Ruifeng Zheng <ruife...@apache.org> AuthorDate: Fri Mar 8 20:52:22 2024 +0900 [SPARK-47322][PYTHON][CONNECT] Make `withColumnsRenamed` column names duplication handling consistent with `withColumnRenamed` ### What changes were proposed in this pull request? Make `withColumnsRenamed` duplicated column name handling consistent with `withColumnRenamed` ### Why are the changes needed? `withColumnsRenamed` checks the column names duplication of output dataframe, this is not consistent with `withColumnRenamed`: 1, `withColumnRenamed` doesn't do this check, and support output a dataframe with duplicated column names; 2, when the input dataframe has duplicated column names, `withColumnsRenamed` always fail, even if the columns with the same name are not touched at all: ``` In [8]: df1 = spark.createDataFrame([(1, "id2"),], ["id", "value"]) ...: df2 = spark.createDataFrame([(1, 'x', 'id1'), ], ["id", 'a', "value"]) ...: join = df2.join(df1, on=['id'], how='left') ...: join Out[8]: DataFrame[id: bigint, a: string, value: string, value: string] In [9]: join.withColumnRenamed('id', 'value') Out[9]: DataFrame[value: bigint, a: string, value: string, value: string] In [10]: join.withColumnsRenamed({'id' : 'value'}) ... AnalysisException: [COLUMN_ALREADY_EXISTS] The column `value` already exists. Choose another name or rename the existing column. SQLSTATE: 42711 In [11]: join.withColumnRenamed('a', 'b') Out[11]: DataFrame[id: bigint, b: string, value: string, value: string] In [12]: join.withColumnsRenamed({'a' : 'b'}) ... AnalysisException: [COLUMN_ALREADY_EXISTS] The column `value` already exists. Choose another name or rename the existing column. SQLSTATE: 42711 In [13]: join.withColumnRenamed('x', 'y') Out[13]: DataFrame[id: bigint, a: string, value: string, value: string] In [14]: join.withColumnsRenamed({'x' : 'y'}) AnalysisException: [COLUMN_ALREADY_EXISTS] The column `value` already exists. Choose another name or rename the existing column. SQLSTATE: 42711 In [15]: join.withColumnRenamed('value', 'new_value') Out[15]: DataFrame[id: bigint, a: string, new_value: string, new_value: string] In [16]: join.withColumnsRenamed({'value' : 'new_value'}) AnalysisException: [COLUMN_ALREADY_EXISTS] The column `new_value` already exists. Choose another name or rename the existing column. SQLSTATE: 42711 ``` ### Does this PR introduce _any_ user-facing change? yes ### How was this patch tested? updated tests ### Was this patch authored or co-authored using generative AI tooling? no Closes #45431 from zhengruifeng/connect_renames. Authored-by: Ruifeng Zheng <ruife...@apache.org> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- .../connect/planner/SparkConnectProtoSuite.scala | 9 ----- python/pyspark/sql/tests/test_dataframe.py | 22 ++++++++++++ .../main/scala/org/apache/spark/sql/Dataset.scala | 3 -- .../org/apache/spark/sql/DataFrameSuite.scala | 39 ---------------------- 4 files changed, 22 insertions(+), 51 deletions(-) diff --git a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala index 1b50936d935a..b989f5027cf9 100644 --- a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala +++ b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala @@ -566,15 +566,6 @@ class SparkConnectProtoSuite extends PlanTest with SparkConnectPlanTest { comparePlans( connectTestRelation.withColumnsRenamed(Map("id" -> "id1", "id" -> "id2")), sparkTestRelation.withColumnsRenamed(Map("id" -> "id1", "id" -> "id2"))) - - checkError( - exception = intercept[AnalysisException] { - transform( - connectTestRelation.withColumnsRenamed( - Map("id" -> "duplicatedCol", "name" -> "duplicatedCol"))) - }, - errorClass = "COLUMN_ALREADY_EXISTS", - parameters = Map("columnName" -> "`duplicatedcol`")) } test("Writes fails without path or table") { diff --git a/python/pyspark/sql/tests/test_dataframe.py b/python/pyspark/sql/tests/test_dataframe.py index ad7a3b8dfc96..38310f12a4a7 100644 --- a/python/pyspark/sql/tests/test_dataframe.py +++ b/python/pyspark/sql/tests/test_dataframe.py @@ -231,6 +231,28 @@ class DataFrameTestsMixin: message_parameters={"arg_name": "colsMap", "arg_type": "tuple"}, ) + def test_with_columns_renamed_with_duplicated_names(self): + df1 = self.spark.createDataFrame([(1, "v1")], ["id", "value"]) + df2 = self.spark.createDataFrame([(1, "x", "v2")], ["id", "a", "value"]) + join = df2.join(df1, on=["id"], how="left") + + self.assertEqual( + join.withColumnRenamed("id", "value").columns, + join.withColumnsRenamed({"id": "value"}).columns, + ) + self.assertEqual( + join.withColumnRenamed("a", "b").columns, + join.withColumnsRenamed({"a": "b"}).columns, + ) + self.assertEqual( + join.withColumnRenamed("value", "new_value").columns, + join.withColumnsRenamed({"value": "new_value"}).columns, + ) + self.assertEqual( + join.withColumnRenamed("x", "y").columns, + join.withColumnsRenamed({"x": "y"}).columns, + ) + def test_ordering_of_with_columns_renamed(self): df = self.spark.range(10) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 189be1d6a30d..f3bf6119659d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -2932,9 +2932,6 @@ class Dataset[T] private[sql]( } ) } - SchemaUtils.checkColumnNameDuplication( - projectList.map(_.name), - sparkSession.sessionState.conf.caseSensitiveAnalysis) withPlan(Project(projectList, logicalPlan)) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 1dc367fa6bf6..6b34a6412cc0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -789,45 +789,6 @@ class DataFrameSuite extends QueryTest assert(df.columns === Array("key", "value", "renamed1", "renamed2")) } - test("SPARK-40311: withColumnsRenamed case sensitive") { - withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { - val df = testData.toDF().withColumns(Seq("newCol1", "newCOL2"), - Seq(col("key") + 1, col("key") + 2)) - .withColumnsRenamed(Map("newCol1" -> "renamed1", "newCol2" -> "renamed2")) - checkAnswer( - df, - testData.collect().map { case Row(key: Int, value: String) => - Row(key, value, key + 1, key + 2) - }.toSeq) - assert(df.columns === Array("key", "value", "renamed1", "newCOL2")) - } - } - - test("SPARK-40311: withColumnsRenamed duplicate column names simple") { - checkError( - exception = intercept[AnalysisException] { - person.withColumnsRenamed(Map("id" -> "renamed", "name" -> "renamed")) - }, - errorClass = "COLUMN_ALREADY_EXISTS", - parameters = Map("columnName" -> "`renamed`")) - } - - test("SPARK-40311: withColumnsRenamed duplicate column names simple case sensitive") { - withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { - val df = person.withColumnsRenamed(Map("id" -> "renamed", "name" -> "Renamed")) - assert(df.columns === Array("renamed", "Renamed", "age")) - } - } - - test("SPARK-40311: withColumnsRenamed duplicate column names indirect") { - checkError( - exception = intercept[AnalysisException] { - person.withColumnsRenamed(Map("id" -> "renamed1", "renamed1" -> "age")) - }, - errorClass = "COLUMN_ALREADY_EXISTS", - parameters = Map("columnName" -> "`age`")) - } - test("SPARK-46260: withColumnsRenamed should respect the Map ordering") { val df = spark.range(10).toDF() assert(df.withColumnsRenamed(ListMap("id" -> "a", "a" -> "b")).columns === Array("b")) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org