GitHub user e-dorigatti opened a pull request:
https://github.com/apache/spark/pull/21538
[SPARK-23754][PYTHON][FOLLOWUP] Move UDF stop iteration wrapping from
driver to executor
SPARK-23754 was fixed in #21383 by changing the UDF code to wrap the user
function, but this required a hack to save its argspec. This PR reverts this
change and fixes the `StopIteration` bug in the worker.
The root of the problem is that when an user-supplied function raises a
`StopIteration`, pyspark might stop processing data, if this function is used
in a for-loop. The solution is to catch `StopIteration`s exceptions and
re-raise them as `RuntimeError`s, so that the execution fails and the error is
reported to the user. This is done using the `fail_on_stopiteration` wrapper,
in different ways depending on where the function is used:
- In RDDs, the user function is wrapped in the driver, because this
function is also called in the driver itself.
- In SQL UDFs, the function is wrapped in the worker, since all processing
happens there. Moreover, the worker needs the signature of the user function,
which is lost when wrapping it, but passing this signature to the worker
requires a not so nice hack.
@HyukjinKwon
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/e-dorigatti/spark branch-2.3
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/spark/pull/21538.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #21538
----
commit 762893682e2bb1e7c5b065eab33e472660cdb4fb
Author: e-dorigatti <emilio.dorigatti@...>
Date: 2018-05-30T10:11:33Z
[SPARK-23754][PYTHON] Re-raising StopIteration in client code
Make sure that `StopIteration`s raised in users' code do not silently
interrupt processing by spark, but are raised as exceptions to the users. The
users' functions are wrapped in `safe_iter` (in `shuffle.py`), which re-raises
`StopIteration`s as `RuntimeError`s
Unit tests, making sure that the exceptions are indeed raised. I am not
sure how to check whether a `Py4JJavaError` contains my exception, so I simply
looked for the exception message in the java exception's `toString`. Can you
propose a better way?
This is my original work, licensed in the same way as spark
Author: e-dorigatti <[email protected]>
Closes #21383 from e-dorigatti/fix_spark_23754.
(cherry picked from commit 0ebb0c0d4dd3e192464dc5e0e6f01efa55b945ed)
commit e7db4688fba6ddd8168288c78d4106550211569b
Author: edorigatti <emilio.dorigatti@...>
Date: 2018-06-12T07:49:04Z
Merge remote-tracking branch 'upstream/branch-2.3' into branch-2.3
commit 217e730ec60e6b74fa12cf3e6ec6365be8c82aec
Author: edorigatti <emilio.dorigatti@...>
Date: 2018-06-11T02:15:42Z
[SPARK-23754][PYTHON][FOLLOWUP] Move UDF stop iteration wrapping from
driver to executor
SPARK-23754 was fixed in #21383 by changing the UDF code to wrap the user
function, but this required a hack to save its argspec. This PR reverts this
change and fixes the `StopIteration` bug in the worker
The root of the problem is that when an user-supplied function raises a
`StopIteration`, pyspark might stop processing data, if this function is used
in a for-loop. The solution is to catch `StopIteration`s exceptions and
re-raise them as `RuntimeError`s, so that the execution fails and the error is
reported to the user. This is done using the `fail_on_stopiteration` wrapper,
in different ways depending on where the function is used:
- In RDDs, the user function is wrapped in the driver, because this
function is also called in the driver itself.
- In SQL UDFs, the function is wrapped in the worker, since all processing
happens there. Moreover, the worker needs the signature of the user function,
which is lost when wrapping it, but passing this signature to the worker
requires a not so nice hack.
Same tests, plus tests for pandas UDFs
Author: edorigatti <[email protected]>
Closes #21467 from e-dorigatti/fix_udf_hack.
----
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]