Github user xguo27 commented on the pull request:

    https://github.com/apache/spark/pull/10935#issuecomment-190554648
  
    Using these two functionally equavalent code snippets:
    
    Scala
    ```
    val data = Seq((1, "1"), (2, "2"), (3, "2"), (1, "3")).toDF("a","b")
    val my_filter = sqlContext.udf.register("my_filter", (a:Int) => a==1)
    data.select(col("a")).distinct().filter(my_filter(col("a")))
    ```
    
    Python
    ```
    data = sqlContext.createDataFrame([(1, "1"), (2, "2"), (3, "2"), (1, "3")], 
["a", "b"])
    my_filter = udf(lambda a: a == 1, BooleanType())
    data.select(col("a")).distinct().filter(my_filter(col("a")))
    ```
    
    The logical plan comes out `execute(aggregateCondition)` in here is as 
below:
    
    
https://github.com/apache/spark/blob/916fc34f98dd731f607d9b3ed657bad6cc30df2c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala#L801
    
    Scala
    ```
    Aggregate [a#8], [UDF(a#8) AS havingCondition#11]
    +- Project [a#8]
       +- Project [_1#6 AS a#8,_2#7 AS b#9]
          +- LocalRelation [_1#6,_2#7], [[1,1],[2,2],[3,2],[1,3]]
    ```
    
    Python
    ```
    Project [havingCondition#2]
    +- Aggregate [a#0L], [pythonUDF#3 AS havingCondition#2]
       +- EvaluatePython PythonUDF#<lambda>(a#0L), pythonUDF#3: boolean
          +- Project [a#0L]
             +- LogicalRDD [a#0L,b#1], MapPartitionsRDD[4] at 
applySchemaToPythonRDD at NativeMethodAccessorImpl.java:-2
    ```
    We can see in Python's case, we inject an extra Project when 
`execute(aggregateCondition)`going through ExtractPythonUDFs, but 
ResolveAggregateFunctions expects an Aggregate here:
    
    
https://github.com/apache/spark/blob/916fc34f98dd731f607d9b3ed657bad6cc30df2c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala#L801-L805
    
    
    With this fix, the logical plan generated for Python UDFs does not 
construct a Project if it is an Aggregate, making it consistent with its Scala 
counterpart, which gives correct results for ResolveAggregateFunctions to 
consume:
    
    After fix, Python:
    ```
    Aggregate [a#0L], [pythonUDF#3 AS havingCondition#2]
    +- EvaluatePython PythonUDF#<lambda>(a#0L), pythonUDF#3: boolean
       +- Project [a#0L]
          +- LogicalRDD [a#0L,b#1], MapPartitionsRDD[4] at 
applySchemaToPythonRDD at NativeMethodAccessorImpl.java:-2
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to