GitHub user HyukjinKwon opened a pull request: https://github.com/apache/spark/pull/20409
Reset the cache in asNondeterministic to set deterministic properly ## What changes were proposed in this pull request? Reproducer: ```python from pyspark.sql.functions import udf f = udf(lambda x: x) spark.range(1).select(f("id")) # cache JVM UDF instance. f = f.asNondeterministic() spark.range(1).select(f("id"))._jdf.logicalPlan().projectList().head().deterministic() ``` It should return `False` but the current master returns `True`. Seems it's because we cache the JVM UDF instance and then we reuse it even after setting `deterministic` enabled once it's called. For an easy reproducer, with the diff below: ```diff diff --git a/python/pyspark/sql/udf.py b/python/pyspark/sql/udf.py index de96846c5c7..026a78bf547 100644 --- a/python/pyspark/sql/udf.py +++ b/python/pyspark/sql/udf.py @@ -180,6 +180,7 @@ class UserDefinedFunction(object): wrapper.deterministic = self.deterministic wrapper.asNondeterministic = functools.wraps( self.asNondeterministic)(lambda: self.asNondeterministic()._wrapped()) + wrapper._unwrapped = lambda: self return wrapper def asNondeterministic(self): ``` **Before** ```python >>> from pyspark.sql.functions import udf >>> f = udf(lambda x: x) >>> spark.range(1).select(f("id")) DataFrame[<lambda>(id): string] >>> f._unwrapped()._judf_placeholder.udfDeterministic() True >>> ndf = f.asNondeterministic() >>> ndf.deterministic False >>> spark.range(1).select(ndf("id")) DataFrame[<lambda>(id): string] >>> ndf._unwrapped()._judf_placeholder.udfDeterministic() True ``` **After** ```python >>> from pyspark.sql.functions import udf >>> f = udf(lambda x: x) >>> spark.range(1).select(f("id")) DataFrame[<lambda>(id): string] >>> f._unwrapped()._judf_placeholder.udfDeterministic() True >>> ndf = f.asNondeterministic() >>> ndf.deterministic False >>> spark.range(1).select(ndf("id")) DataFrame[<lambda>(id): string] >>> ndf._unwrapped()._judf_placeholder.udfDeterministic() False ``` ## How was this patch tested? Manually tested. I am not sure if I should add the test with a lot of JVM accesses. You can merge this pull request into a Git repository by running: $ git pull https://github.com/HyukjinKwon/spark SPARK-23233 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20409.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20409 ---- commit 317c4fd54ecb707b92088c62aebd551805ecae8f Author: hyukjinkwon <gurwls223@...> Date: 2018-01-26T15:01:22Z Reset the cache in asNondeterministic to set deterministic properly ---- --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org