HyukjinKwon opened a new pull request #28745:
URL: https://github.com/apache/spark/pull/28745


   ### What changes were proposed in this pull request?
   
   This PR proposes to remove the useless projection in grouped and co-grouped 
UDFs, which can cause the analysis failure when the grouping column is 
specified with different upper-lower cases compared to the one specified in the 
return schema.
   
   Currently, projection is initially added in grouped and cogrouped pandas 
UDFs to keep the grouping keys. For example,
   
   ```python
   from pyspark.sql.functions import *
   df = spark.createDataFrame([[1, 1]], ["column", "Score"])
   @pandas_udf("column integer, Score float", PandasUDFType.GROUPED_MAP)
   def my_pandas_udf(pdf):
       return pdf.assign(Score=0.5)
   
   df.groupby('COLUMN').apply(my_pandas_udf).show()
   ```
   
   adds a projection that includes the grouping keys:
   
   ```bash
   == Parsed Logical Plan ==
   'FlatMapGroupsInPandas ['COLUMN], my_pandas_udf(COLUMN#166L, Score#167L), 
[COLUMN#193, Score#194]
   +- 'Project ['COLUMN, column#166L, Score#167L]  # <---- Here
   ...
   ```
   
   which later causes the reference resolution failure:
   
   ```
   pyspark.sql.utils.AnalysisException: "Reference 'COLUMN' is ambiguous, could 
be: COLUMN, COLUMN.;"
   ```
   
   In fact, we don't need to add the grouping keys at all because grouped and 
co-grouped pandas UDFs _always_ require to take _all_ columns as input pandas 
UDF.
   
   After this fix, it will be as below:
   
   ```bash
   == Parsed Logical Plan ==
   'FlatMapGroupsInPandas ['COLUMN], my_pandas_udf(column#0L, Score#1L), 
[column#9, Score#10]
   +- LogicalRDD [column#0L, Score#1L], false
   ```
   
   
   
   ### Why are the changes needed?
   
   This change will fix two things:
   - Performance improvement by not projecting duplicated columns in case the 
output schema contains the grouping key.
   - A bug related to the case sensitivity, see below.
   
   ### Does this PR introduce _any_ user-facing change?
   
   Yes,
   
   
   
   ```python
   from pyspark.sql.functions import *
   df = spark.createDataFrame([[1, 1]], ["column", "Score"])
   @pandas_udf("column integer, Score float", PandasUDFType.GROUPED_MAP)
   def my_pandas_udf(pdf):
       return pdf.assign(Score=0.5)
   
   df.groupby('COLUMN').apply(my_pandas_udf).show()
   ```
   
   ```python
   df1 = spark.createDataFrame([(1, 1)], ("column", "value"))
   df2 = spark.createDataFrame([(1, 1)], ("column", "value"))
   
   df1.groupby("COLUMN").cogroup(
       df2.groupby("COLUMN")
   ).applyInPandas(lambda r, l: r + l, df1.schema).show()
   ```
   
   Before:
   
   ```
   pyspark.sql.utils.AnalysisException: Reference 'COLUMN' is ambiguous, could 
be: COLUMN, COLUMN.;
   ```
   
   ```
   pyspark.sql.utils.AnalysisException: cannot resolve '`COLUMN`' given input 
columns: [COLUMN, COLUMN, value, value];;
   'FlatMapCoGroupsInPandas ['COLUMN], ['COLUMN], <lambda>(column#9L, 
value#10L, column#13L, value#14L), [column#22L, value#23L]
   :- Project [COLUMN#9L, column#9L, value#10L]
   :  +- LogicalRDD [column#9L, value#10L], false
   +- Project [COLUMN#13L, column#13L, value#14L]
      +- LogicalRDD [column#13L, value#14L], false
   ```
   
   
   After:
   
   ```
   +------+-----+
   |column|Score|
   +------+-----+
   |     1|  0.5|
   +------+-----+
   ```
   
   ```
   +------+-----+
   |column|value|
   +------+-----+
   |     2|    2|
   +------+-----+
   ```
   
   ### How was this patch tested?
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to