GitHub user HyukjinKwon opened a pull request:
https://github.com/apache/spark/pull/20409
Reset the cache in asNondeterministic to set deterministic properly
## What changes were proposed in this pull request?
Reproducer:
```python
from pyspark.sql.functions import udf
f = udf(lambda x: x)
spark.range(1).select(f("id")) # cache JVM UDF instance.
f = f.asNondeterministic()
spark.range(1).select(f("id"))._jdf.logicalPlan().projectList().head().deterministic()
```
It should return `False` but the current master returns `True`. Seems it's
because we cache the JVM UDF instance and then we reuse it even after setting
`deterministic` enabled once it's called.
For an easy reproducer, with the diff below:
```diff
diff --git a/python/pyspark/sql/udf.py b/python/pyspark/sql/udf.py
index de96846c5c7..026a78bf547 100644
--- a/python/pyspark/sql/udf.py
+++ b/python/pyspark/sql/udf.py
@@ -180,6 +180,7 @@ class UserDefinedFunction(object):
wrapper.deterministic = self.deterministic
wrapper.asNondeterministic = functools.wraps(
self.asNondeterministic)(lambda:
self.asNondeterministic()._wrapped())
+ wrapper._unwrapped = lambda: self
return wrapper
def asNondeterministic(self):
```
**Before**
```python
>>> from pyspark.sql.functions import udf
>>> f = udf(lambda x: x)
>>> spark.range(1).select(f("id"))
DataFrame[<lambda>(id): string]
>>> f._unwrapped()._judf_placeholder.udfDeterministic()
True
>>> ndf = f.asNondeterministic()
>>> ndf.deterministic
False
>>> spark.range(1).select(ndf("id"))
DataFrame[<lambda>(id): string]
>>> ndf._unwrapped()._judf_placeholder.udfDeterministic()
True
```
**After**
```python
>>> from pyspark.sql.functions import udf
>>> f = udf(lambda x: x)
>>> spark.range(1).select(f("id"))
DataFrame[<lambda>(id): string]
>>> f._unwrapped()._judf_placeholder.udfDeterministic()
True
>>> ndf = f.asNondeterministic()
>>> ndf.deterministic
False
>>> spark.range(1).select(ndf("id"))
DataFrame[<lambda>(id): string]
>>> ndf._unwrapped()._judf_placeholder.udfDeterministic()
False
```
## How was this patch tested?
Manually tested. I am not sure if I should add the test with a lot of JVM
accesses.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/HyukjinKwon/spark SPARK-23233
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/spark/pull/20409.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #20409
----
commit 317c4fd54ecb707b92088c62aebd551805ecae8f
Author: hyukjinkwon <gurwls223@...>
Date: 2018-01-26T15:01:22Z
Reset the cache in asNondeterministic to set deterministic properly
----
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]