Github user HyukjinKwon commented on a diff in the pull request:
https://github.com/apache/spark/pull/21383#discussion_r191069075
--- Diff: python/pyspark/util.py ---
@@ -89,6 +93,33 @@ def majorMinorVersion(sparkVersion):
" version numbers.")
+def fail_on_stopiteration(f):
+ """
+ Wraps the input function to fail on 'StopIteration' by raising a
'RuntimeError'
+ prevents silent loss of data when 'f' is used in a for loop
+ """
+ def wrapper(*args, **kwargs):
+ try:
+ return f(*args, **kwargs)
+ except StopIteration as exc:
+ raise RuntimeError(
+ "Caught StopIteration thrown from user's code; failing the
task",
+ exc
+ )
+
+ # prevent inspect to fail
+ # e.g. inspect.getargspec(sum) raises
+ # TypeError: <built-in function sum> is not a Python function
+ try:
+ argspec = _get_argspec(f)
--- End diff --
Can't we just do `wrapper._wrapped_argspec = _get_argspec(f)` here? I would
name it just `_argspec`.
Also, `argspec` will only going to be used for Pandas UDFs for now. Let's
do this hack within `_create_judf`.
e.g.,
```
if evalType in (PythonEvalType.SQL_SCALAR_PANDAS_UDF,
PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF,
PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF):
# the hack
```
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]