HyukjinKwon opened a new pull request #28777:
URL: https://github.com/apache/spark/pull/28777


   ### What changes were proposed in this pull request?
   
   This is another approach to fix the issue. See the previous try 
https://github.com/apache/spark/pull/28745. It was too invasive so I took more 
conservative approach.
   
   This PR proposes to resolve grouping attributes separately first so it can 
be properly referred when `FlatMapGroupsInPandas` and `FlatMapCoGroupsInPandas` 
are resolved without ambiguity.
   
   Previously, 
   
   ```python
   from pyspark.sql.functions import *
   df = spark.createDataFrame([[1, 1]], ["column", "Score"])
   @pandas_udf("column integer, Score float", PandasUDFType.GROUPED_MAP)
   def my_pandas_udf(pdf):
       return pdf.assign(Score=0.5)
   
   df.groupby('COLUMN').apply(my_pandas_udf).show()
   ```
   
   was failed as below:
   
   ```
   pyspark.sql.utils.AnalysisException: "Reference 'COLUMN' is ambiguous, could 
be: COLUMN, COLUMN.;"
   ```
   because the unresolved `COLUMN` in `FlatMapGroupsInPandas` doesn't know 
which reference to take from the child projection.
   
   After this fix, it resolves the child projection first with grouping keys 
and pass, to `FlatMapGroupsInPandas`, the attribute as a grouping key from the 
child projection that is positionally selected.
   
   ### Why are the changes needed?
   
   To resolve grouping keys correctly.
   
   ### Does this PR introduce _any_ user-facing change?
   
   Yes,
   
   
   
   ```python
   from pyspark.sql.functions import *
   df = spark.createDataFrame([[1, 1]], ["column", "Score"])
   @pandas_udf("column integer, Score float", PandasUDFType.GROUPED_MAP)
   def my_pandas_udf(pdf):
       return pdf.assign(Score=0.5)
   
   df.groupby('COLUMN').apply(my_pandas_udf).show()
   ```
   
   ```python
   df1 = spark.createDataFrame([(1, 1)], ("column", "value"))
   df2 = spark.createDataFrame([(1, 1)], ("column", "value"))
   
   df1.groupby("COLUMN").cogroup(
       df2.groupby("COLUMN")
   ).applyInPandas(lambda r, l: r + l, df1.schema).show()
   ```
   
   Before:
   
   ```
   pyspark.sql.utils.AnalysisException: Reference 'COLUMN' is ambiguous, could 
be: COLUMN, COLUMN.;
   ```
   
   ```
   pyspark.sql.utils.AnalysisException: cannot resolve '`COLUMN`' given input 
columns: [COLUMN, COLUMN, value, value];;
   'FlatMapCoGroupsInPandas ['COLUMN], ['COLUMN], <lambda>(column#9L, 
value#10L, column#13L, value#14L), [column#22L, value#23L]
   :- Project [COLUMN#9L, column#9L, value#10L]
   :  +- LogicalRDD [column#9L, value#10L], false
   +- Project [COLUMN#13L, column#13L, value#14L]
      +- LogicalRDD [column#13L, value#14L], false
   ```
   
   
   After:
   
   ```
   +------+-----+
   |column|Score|
   +------+-----+
   |     1|  0.5|
   +------+-----+
   ```
   
   ```
   +------+-----+
   |column|value|
   +------+-----+
   |     2|    2|
   +------+-----+
   ```
   
   ### How was this patch tested?
   
   Unittests were added and manually tested.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to