This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.2 by this push: new 1ac74a1b170 [SPARK-39962][PYTHON][SQL] Apply projection when group attributes are empty 1ac74a1b170 is described below commit 1ac74a1b170f52a093246d0bc13767160ddfbe46 Author: Hyukjin Kwon <gurwls...@apache.org> AuthorDate: Wed Aug 3 16:11:20 2022 +0900 [SPARK-39962][PYTHON][SQL] Apply projection when group attributes are empty ### What changes were proposed in this pull request? This PR proposes to apply the projection to respect the reordered columns in its child when group attributes are empty. ### Why are the changes needed? To respect the column order in the child. ### Does this PR introduce _any_ user-facing change? Yes, it fixes a bug as below: ```python import pandas as pd from pyspark.sql import functions as f f.pandas_udf("double") def AVG(x: pd.Series) -> float: return x.mean() abc = spark.createDataFrame([(1.0, 5.0, 17.0)], schema=["a", "b", "c"]) abc.agg(AVG("a"), AVG("c")).show() abc.select("c", "a").agg(AVG("a"), AVG("c")).show() ``` **Before** ``` +------+------+ |AVG(a)|AVG(c)| +------+------+ | 17.0| 1.0| +------+------+ ``` **After** ``` +------+------+ |AVG(a)|AVG(c)| +------+------+ | 1.0| 17.0| +------+------+ ``` ### How was this patch tested? Manually tested, and added an unittest. Closes #37390 from HyukjinKwon/SPARK-39962. Authored-by: Hyukjin Kwon <gurwls...@apache.org> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> (cherry picked from commit 5335c784ae76c9cc0aaa7a4b57b3cd6b3891ad9a) Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- .../spark/sql/execution/python/AggregateInPandasExec.scala | 5 +++-- .../apache/spark/sql/execution/python/PythonUDFSuite.scala | 13 +++++++++++++ 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala index 69802b143c1..0b0ed44958e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala @@ -131,12 +131,13 @@ case class AggregateInPandasExec( val newIter: Iterator[InternalRow] = mayAppendUpdatingSessionIterator(iter) val prunedProj = UnsafeProjection.create(allInputs.toSeq, child.output) - val grouped = if (groupingExpressions.isEmpty) { + val groupedItr = if (groupingExpressions.isEmpty) { // Use an empty unsafe row as a place holder for the grouping key Iterator((new UnsafeRow(), newIter)) } else { GroupedIterator(newIter, groupingExpressions, child.output) - }.map { case (key, rows) => + } + val grouped = groupedItr.map { case (key, rows) => (key, rows.map(prunedProj)) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonUDFSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonUDFSuite.scala index 45b57207c57..4ad7f901053 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonUDFSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonUDFSuite.scala @@ -71,4 +71,17 @@ class PythonUDFSuite extends QueryTest with SharedSparkSession { pythonTestUDF(count(pythonTestUDF(base("a") + 1)))) checkAnswer(df1, df2) } + + test("SPARK-39962: Global aggregation of Pandas UDF should respect the column order") { + assume(shouldTestGroupedAggPandasUDFs) + val df = Seq[(java.lang.Integer, java.lang.Integer)]((1, null)).toDF("a", "b") + + val pandasTestUDF = TestGroupedAggPandasUDF(name = "pandas_udf") + val reorderedDf = df.select("b", "a") + val actual = reorderedDf.agg( + pandasTestUDF(reorderedDf("a")), pandasTestUDF(reorderedDf("b"))) + val expected = df.agg(pandasTestUDF(df("a")), pandasTestUDF(df("b"))) + + checkAnswer(actual, expected) + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org