GitHub user HyukjinKwon opened a pull request:

    https://github.com/apache/spark/pull/20409

    Reset the cache in asNondeterministic to set deterministic properly

    ## What changes were proposed in this pull request?
    
    
    Reproducer:
    
    ```python
    from pyspark.sql.functions import udf
    f = udf(lambda x: x)
    spark.range(1).select(f("id"))  # cache JVM UDF instance.
    f = f.asNondeterministic()
    
spark.range(1).select(f("id"))._jdf.logicalPlan().projectList().head().deterministic()
    ```
    
    It should return `False` but the current master returns `True`. Seems it's 
because we cache the JVM UDF instance and then we reuse it even after setting 
`deterministic` enabled once it's called.
    
    For an easy reproducer, with the diff below:
    
    
    ```diff
    diff --git a/python/pyspark/sql/udf.py b/python/pyspark/sql/udf.py
    index de96846c5c7..026a78bf547 100644
    --- a/python/pyspark/sql/udf.py
    +++ b/python/pyspark/sql/udf.py
    @@ -180,6 +180,7 @@ class UserDefinedFunction(object):
             wrapper.deterministic = self.deterministic
             wrapper.asNondeterministic = functools.wraps(
                 self.asNondeterministic)(lambda: 
self.asNondeterministic()._wrapped())
    +        wrapper._unwrapped = lambda: self
             return wrapper
    
         def asNondeterministic(self):
    ```
    
    
    **Before**
    
    ```python
    >>> from pyspark.sql.functions import udf
    >>> f = udf(lambda x: x)
    >>> spark.range(1).select(f("id"))
    DataFrame[<lambda>(id): string]
    >>> f._unwrapped()._judf_placeholder.udfDeterministic()
    True
    >>> ndf = f.asNondeterministic()
    >>> ndf.deterministic
    False
    >>> spark.range(1).select(ndf("id"))
    DataFrame[<lambda>(id): string]
    >>> ndf._unwrapped()._judf_placeholder.udfDeterministic()
    True
    ```
    
    **After**
    
    
    ```python
    >>> from pyspark.sql.functions import udf
    >>> f = udf(lambda x: x)
    >>> spark.range(1).select(f("id"))
    DataFrame[<lambda>(id): string]
    >>> f._unwrapped()._judf_placeholder.udfDeterministic()
    True
    >>> ndf = f.asNondeterministic()
    >>> ndf.deterministic
    False
    >>> spark.range(1).select(ndf("id"))
    DataFrame[<lambda>(id): string]
    >>> ndf._unwrapped()._judf_placeholder.udfDeterministic()
    False
    ```
    
    ## How was this patch tested?
    
    Manually tested. I am not sure if I should add the test with a lot of JVM 
accesses.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/HyukjinKwon/spark SPARK-23233

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/20409.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #20409
    
----
commit 317c4fd54ecb707b92088c62aebd551805ecae8f
Author: hyukjinkwon <gurwls223@...>
Date:   2018-01-26T15:01:22Z

    Reset the cache in asNondeterministic to set deterministic properly

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to