GitHub user viirya opened a pull request:
https://github.com/apache/spark/pull/19592
[SPARK-22347][SQL][PySpark] Support optionally running PythonUDFs in
conditional expressions
## What changes were proposed in this pull request?
Under the current execution mode of Python UDFs, we don't well support
Python UDFs as branch values or else value in CaseWhen expression. The
execution of batch Python UDFs evaluates the UDFs in an operator at all rows.
It breaks the semantics of the conditional expressions and causes failures
under some cases:
```python
from pyspark.sql import Row
from pyspark.sql.functions import col, udf, when
from pyspark.sql.types import IntegerType
df = sc.parallelize([Row(x=5), Row(x=0)]).toDF()
f = udf(lambda value: 10 / int(value), IntegerType())
whenExpr1 = when((col('x') > 0), f(col('x')))
df.select(whenExpr1).collect() ## Raise a division by zero error
```
The patch fixes the issue by adding an extra argument for Python UDFs used
with conditional expressions. The argument takes the evaluated value of
conditions. In Python side, we can optionally run Python UDFs based on the
condition value.
## How was this patch tested?
Added python tests.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/viirya/spark-1 SPARK-22347
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/spark/pull/19592.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #19592
----
commit 0515435ec463cd40f69a00be74ff1efddc07bde4
Author: Liang-Chi Hsieh <[email protected]>
Date: 2017-10-26T11:56:02Z
Support optionally running a python udf when using with conditional
expressions.
----
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]