GitHub user viirya opened a pull request:
https://github.com/apache/spark/pull/20379
[SPARK-23177][SQL][PySpark][Backport-2.3] Extract zero-parameter UDFs from
aggregate
## What changes were proposed in this pull request?
We extract Python UDFs in logical aggregate which depends on aggregate
expression or grouping key in ExtractPythonUDFFromAggregate rule. But Python
UDFs which don't depend on above expressions should also be extracted to avoid
the issue reported in the JIRA.
A small code snippet to reproduce that issue looks like:
```python
import pyspark.sql.functions as f
df = spark.createDataFrame([(1,2), (3,4)])
f_udf = f.udf(lambda: str("const_str"))
df2 = df.distinct().withColumn("a", f_udf())
df2.show()
```
Error exception is raised as:
```
: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Binding
attribute, tree: pythonUDF0#50
at
org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
at
org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:91)
at
org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:90)
at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
at
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
at
org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at
org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256)
at
org.apache.spark.sql.catalyst.expressions.BindReferences$.bindReference(BoundAttribute.scala:90)
at
org.apache.spark.sql.execution.aggregate.HashAggregateExec$$anonfun$38.apply(HashAggregateExec.scala:514)
at
org.apache.spark.sql.execution.aggregate.HashAggregateExec$$anonfun$38.apply(HashAggregateExec.scala:513)
```
This exception raises because `HashAggregateExec` tries to bind the aliased
Python UDF expression (e.g., `pythonUDF0#50 AS a#44`) to grouping key.
## How was this patch tested?
Added test.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/viirya/spark-1 SPARK-23177-backport-2.3
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/spark/pull/20379.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #20379
----
commit a66b5e0c4b81444974f02c7154111b47a1a5137c
Author: Liang-Chi Hsieh <viirya@...>
Date: 2018-01-24T06:50:53Z
Extract parameter-less UDFs from aggregate.
----
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]