[GitHub] spark pull request #21467: [SPARK-23754][PYTHON][FOLLOWUP] Move UDF stop ite...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/21467 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21467: [SPARK-23754][PYTHON][FOLLOWUP] Move UDF stop ite...
Github user e-dorigatti commented on a diff in the pull request: https://github.com/apache/spark/pull/21467#discussion_r194041622 --- Diff: python/pyspark/worker.py --- @@ -140,15 +139,18 @@ def read_single_udf(pickleSer, infile, eval_type): else: row_func = chain(row_func, f) +func = fail_on_stopiteration(row_func) --- End diff -- Clearer? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21467: [SPARK-23754][PYTHON][FOLLOWUP] Move UDF stop ite...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21467#discussion_r193307267 --- Diff: python/pyspark/tests.py --- @@ -1291,27 +1291,31 @@ def test_pipe_unicode(self): result = rdd.pipe('cat').collect() self.assertEqual(data, result) -def test_stopiteration_in_client_code(self): +def test_stopiteration_in_user_code(self): def stopit(*x): raise StopIteration() seq_rdd = self.sc.parallelize(range(10)) keyed_rdd = self.sc.parallelize((x % 2, x) for x in range(10)) +msg = "Caught StopIteration thrown from user's code; failing the task" -self.assertRaises(Py4JJavaError, seq_rdd.map(stopit).collect) -self.assertRaises(Py4JJavaError, seq_rdd.filter(stopit).collect) -self.assertRaises(Py4JJavaError, seq_rdd.cartesian(seq_rdd).flatMap(stopit).collect) -self.assertRaises(Py4JJavaError, seq_rdd.foreach, stopit) -self.assertRaises(Py4JJavaError, keyed_rdd.reduceByKeyLocally, stopit) -self.assertRaises(Py4JJavaError, seq_rdd.reduce, stopit) -self.assertRaises(Py4JJavaError, seq_rdd.fold, 0, stopit) +self.assertRaisesRegexp(Py4JJavaError, msg, seq_rdd.map(stopit).collect) +self.assertRaisesRegexp(Py4JJavaError, msg, seq_rdd.filter(stopit).collect) +self.assertRaisesRegexp(Py4JJavaError, msg, seq_rdd.foreach, stopit) +self.assertRaisesRegexp(Py4JJavaError, msg, seq_rdd.reduce, stopit) +self.assertRaisesRegexp(Py4JJavaError, msg, seq_rdd.fold, 0, stopit) +self.assertRaisesRegexp(Py4JJavaError, msg, seq_rdd.foreach, stopit) +self.assertRaisesRegexp(Py4JJavaError, msg, + seq_rdd.cartesian(seq_rdd).flatMap(stopit).collect) # the exception raised is non-deterministic --- End diff -- Yea, I asked this before. He explained that the exception can be thrown in driver side or executor side too non-deterministically. We should clarify this comment. It's quite core fix. Let's clarify everything. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21467: [SPARK-23754][PYTHON][FOLLOWUP] Move UDF stop ite...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21467#discussion_r193306421 --- Diff: python/pyspark/tests.py --- @@ -1291,27 +1291,31 @@ def test_pipe_unicode(self): result = rdd.pipe('cat').collect() self.assertEqual(data, result) -def test_stopiteration_in_client_code(self): +def test_stopiteration_in_user_code(self): def stopit(*x): raise StopIteration() seq_rdd = self.sc.parallelize(range(10)) keyed_rdd = self.sc.parallelize((x % 2, x) for x in range(10)) +msg = "Caught StopIteration thrown from user's code; failing the task" -self.assertRaises(Py4JJavaError, seq_rdd.map(stopit).collect) -self.assertRaises(Py4JJavaError, seq_rdd.filter(stopit).collect) -self.assertRaises(Py4JJavaError, seq_rdd.cartesian(seq_rdd).flatMap(stopit).collect) -self.assertRaises(Py4JJavaError, seq_rdd.foreach, stopit) -self.assertRaises(Py4JJavaError, keyed_rdd.reduceByKeyLocally, stopit) -self.assertRaises(Py4JJavaError, seq_rdd.reduce, stopit) -self.assertRaises(Py4JJavaError, seq_rdd.fold, 0, stopit) +self.assertRaisesRegexp(Py4JJavaError, msg, seq_rdd.map(stopit).collect) +self.assertRaisesRegexp(Py4JJavaError, msg, seq_rdd.filter(stopit).collect) +self.assertRaisesRegexp(Py4JJavaError, msg, seq_rdd.foreach, stopit) +self.assertRaisesRegexp(Py4JJavaError, msg, seq_rdd.reduce, stopit) +self.assertRaisesRegexp(Py4JJavaError, msg, seq_rdd.fold, 0, stopit) +self.assertRaisesRegexp(Py4JJavaError, msg, seq_rdd.foreach, stopit) +self.assertRaisesRegexp(Py4JJavaError, msg, + seq_rdd.cartesian(seq_rdd).flatMap(stopit).collect) # the exception raised is non-deterministic --- End diff -- What does this mean? The exception is non-deterministic? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21467: [SPARK-23754][PYTHON][FOLLOWUP] Move UDF stop ite...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21467#discussion_r193302276 --- Diff: python/pyspark/sql/tests.py --- @@ -4096,6 +4080,43 @@ def foo(df): def foo(k, v, w): return k +def test_stopiteration_in_udf(self): +from pyspark.sql.functions import udf, pandas_udf, PandasUDFType +from py4j.protocol import Py4JJavaError + +def foo(x): +raise StopIteration() + +def foofoo(x, y): +raise StopIteration() + +exc_message = "Caught StopIteration thrown from user's code; failing the task" +df = self.spark.range(0, 100) + +# plain udf (test for SPARK-23754) +self.assertRaisesRegexp(Py4JJavaError, exc_message, df.withColumn( +'v', udf(foo)('id') +).collect) --- End diff -- tiny nit: I would do: ``` self.assertRaisesRegexp( Py4JJavaError, exc_message, df.withColumn('v', udf(foo)('id')).collect) ``` or ``` self.assertRaisesRegexp( Py4JJavaError, exc_message, df.withColumn('v', udf(foo)('id')).collect) ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21467: [SPARK-23754][PYTHON][FOLLOWUP] Move UDF stop ite...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21467#discussion_r192539912 --- Diff: python/pyspark/util.py --- @@ -53,16 +53,11 @@ def _get_argspec(f): """ Get argspec of a function. Supports both Python 2 and Python 3. """ - -if hasattr(f, '_argspec'): -# only used for pandas UDF: they wrap the user function, losing its signature -# workers need this signature, so UDF saves it here -argspec = f._argspec -elif sys.version_info[0] < 3: +# `getargspec` is deprecated since python3.0 (incompatible with function annotations). --- End diff -- yea, I think the comment is for the else block. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21467: [SPARK-23754][PYTHON][FOLLOWUP] Move UDF stop ite...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/21467#discussion_r192513260 --- Diff: python/pyspark/util.py --- @@ -53,16 +53,11 @@ def _get_argspec(f): """ Get argspec of a function. Supports both Python 2 and Python 3. """ - -if hasattr(f, '_argspec'): -# only used for pandas UDF: they wrap the user function, losing its signature -# workers need this signature, so UDF saves it here -argspec = f._argspec -elif sys.version_info[0] < 3: +# `getargspec` is deprecated since python3.0 (incompatible with function annotations). --- End diff -- I meant. The comment itself can be moved back to the "else" block? (This is minor though) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21467: [SPARK-23754][PYTHON][FOLLOWUP] Move UDF stop ite...
Github user e-dorigatti commented on a diff in the pull request: https://github.com/apache/spark/pull/21467#discussion_r192502611 --- Diff: python/pyspark/util.py --- @@ -53,16 +53,11 @@ def _get_argspec(f): """ Get argspec of a function. Supports both Python 2 and Python 3. """ - -if hasattr(f, '_argspec'): -# only used for pandas UDF: they wrap the user function, losing its signature -# workers need this signature, so UDF saves it here -argspec = f._argspec -elif sys.version_info[0] < 3: +# `getargspec` is deprecated since python3.0 (incompatible with function annotations). --- End diff -- no, this is the purpose of this PR :) that's how we fixed a bug in [a previous PR](https://github.com/apache/spark/pull/21383), but we felt it was a hack so now we are doing it properly --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21467: [SPARK-23754][PYTHON][FOLLOWUP] Move UDF stop ite...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/21467#discussion_r192493299 --- Diff: python/pyspark/util.py --- @@ -53,16 +53,11 @@ def _get_argspec(f): """ Get argspec of a function. Supports both Python 2 and Python 3. """ - -if hasattr(f, '_argspec'): -# only used for pandas UDF: they wrap the user function, losing its signature -# workers need this signature, so UDF saves it here -argspec = f._argspec -elif sys.version_info[0] < 3: +# `getargspec` is deprecated since python3.0 (incompatible with function annotations). --- End diff -- This change doesn't seems necessary... Let's move it back? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21467: [SPARK-23754][PYTHON][FOLLOWUP] Move UDF stop ite...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21467#discussion_r192456717 --- Diff: python/pyspark/worker.py --- @@ -140,15 +139,20 @@ def read_single_udf(pickleSer, infile, eval_type): else: row_func = chain(row_func, f) +# make sure StopIteration's raised in the user code are not +# ignored, but re-raised as RuntimeError's +func = fail_on_stopiteration(row_func) --- End diff -- Ah, sure. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21467: [SPARK-23754][PYTHON][FOLLOWUP] Move UDF stop ite...
Github user e-dorigatti commented on a diff in the pull request: https://github.com/apache/spark/pull/21467#discussion_r192455582 --- Diff: python/pyspark/worker.py --- @@ -140,15 +139,20 @@ def read_single_udf(pickleSer, infile, eval_type): else: row_func = chain(row_func, f) +# make sure StopIteration's raised in the user code are not +# ignored, but re-raised as RuntimeError's +func = fail_on_stopiteration(row_func) --- End diff -- I wanted to avoid the overhead of calling get_argspec even when it's not needed --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21467: [SPARK-23754][PYTHON][FOLLOWUP] Move UDF stop ite...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21467#discussion_r192437788 --- Diff: python/pyspark/sql/tests.py --- @@ -901,20 +901,43 @@ def __call__(self, x): self.assertEqual(return_type, f_.returnType) def test_stopiteration_in_udf(self): -# test for SPARK-23754 -from pyspark.sql.functions import udf +from pyspark.sql.functions import udf, pandas_udf, PandasUDFType from py4j.protocol import Py4JJavaError +def do_test(action, *args, **kwargs): +exc_message = "Caught StopIteration thrown from user's code; failing the task" +with self.assertRaisesRegexp(Py4JJavaError, exc_message) as cm: +action(*args, **kwargs) + def foo(x): raise StopIteration() -with self.assertRaises(Py4JJavaError) as cm: -self.spark.range(0, 1000).withColumn('v', udf(foo)('id')).show() +def foofoo(x, y): +raise StopIteration() -self.assertIn( -"Caught StopIteration thrown from user's code; failing the task", -cm.exception.java_exception.toString() -) +df = self.spark.range(0, 100) + +# plain udf (test for SPARK-23754) +do_test(df.withColumn('v', udf(foo)('id')).show) + +# pandas scalar udf +do_test(df.withColumn( +'v', pandas_udf(foo, 'double', PandasUDFType.SCALAR)('id') --- End diff -- Oh, actually let's put them in `PandasUDFTests`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21467: [SPARK-23754][PYTHON][FOLLOWUP] Move UDF stop ite...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21467#discussion_r192437180 --- Diff: python/pyspark/worker.py --- @@ -140,15 +139,20 @@ def read_single_udf(pickleSer, infile, eval_type): else: row_func = chain(row_func, f) +# make sure StopIteration's raised in the user code are not +# ignored, but re-raised as RuntimeError's +func = fail_on_stopiteration(row_func) --- End diff -- I think `row_func` name was fine. Let's just leave it as was. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21467: [SPARK-23754][PYTHON][FOLLOWUP] Move UDF stop ite...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21467#discussion_r192436670 --- Diff: python/pyspark/sql/tests.py --- @@ -901,20 +901,43 @@ def __call__(self, x): self.assertEqual(return_type, f_.returnType) def test_stopiteration_in_udf(self): -# test for SPARK-23754 -from pyspark.sql.functions import udf +from pyspark.sql.functions import udf, pandas_udf, PandasUDFType from py4j.protocol import Py4JJavaError +def do_test(action, *args, **kwargs): --- End diff -- Likewise, let's just do ```python exc_message = "Caught StopIteration thrown from user's code; failing the task" with self.assertRaisesRegexp(Py4JJavaError, exc_message): ... pandas_udf ... ``` I think it doesn't particularly reduce the codes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21467: [SPARK-23754][PYTHON][FOLLOWUP] Move UDF stop ite...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21467#discussion_r192436233 --- Diff: python/pyspark/sql/tests.py --- @@ -901,20 +901,43 @@ def __call__(self, x): self.assertEqual(return_type, f_.returnType) def test_stopiteration_in_udf(self): -# test for SPARK-23754 -from pyspark.sql.functions import udf +from pyspark.sql.functions import udf, pandas_udf, PandasUDFType from py4j.protocol import Py4JJavaError +def do_test(action, *args, **kwargs): +exc_message = "Caught StopIteration thrown from user's code; failing the task" +with self.assertRaisesRegexp(Py4JJavaError, exc_message) as cm: +action(*args, **kwargs) + def foo(x): raise StopIteration() -with self.assertRaises(Py4JJavaError) as cm: -self.spark.range(0, 1000).withColumn('v', udf(foo)('id')).show() +def foofoo(x, y): +raise StopIteration() -self.assertIn( -"Caught StopIteration thrown from user's code; failing the task", -cm.exception.java_exception.toString() -) +df = self.spark.range(0, 100) + +# plain udf (test for SPARK-23754) +do_test(df.withColumn('v', udf(foo)('id')).show) + +# pandas scalar udf +do_test(df.withColumn( +'v', pandas_udf(foo, 'double', PandasUDFType.SCALAR)('id') --- End diff -- Ah, in the Jenkins, we run this tests via Python 2, Python 3 and PyPy. Python 2 and PyPy don't have PyArrow installed, so it throws an exception. It should be placed in `*PandasUDFTests` for each. For example, this is a Scalar Pandas UDF which should be placed in `ScalarPandasUDFTests`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21467: [SPARK-23754][PYTHON][FOLLOWUP] Move UDF stop ite...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/21467#discussion_r192424377 --- Diff: python/pyspark/tests.py --- @@ -1303,11 +1303,13 @@ def stopit(*x): self.assertRaises(Py4JJavaError, seq_rdd.filter(stopit).collect) self.assertRaises(Py4JJavaError, seq_rdd.cartesian(seq_rdd).flatMap(stopit).collect) self.assertRaises(Py4JJavaError, seq_rdd.foreach, stopit) -self.assertRaises(Py4JJavaError, keyed_rdd.reduceByKeyLocally, stopit) self.assertRaises(Py4JJavaError, seq_rdd.reduce, stopit) self.assertRaises(Py4JJavaError, seq_rdd.fold, 0, stopit) +self.assertRaises(Py4JJavaError, seq_rdd.foreach, stopit) --- End diff -- Sorry for the late comment, can you also change these tests to check error message using assertRaisesRegexp? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21467: [SPARK-23754][PYTHON][FOLLOWUP] Move UDF stop ite...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21467#discussion_r192279606 --- Diff: python/pyspark/worker.py --- @@ -140,15 +139,18 @@ def read_single_udf(pickleSer, infile, eval_type): else: row_func = chain(row_func, f) +func = fail_on_stopiteration(row_func) --- End diff -- Add few comments for it? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21467: [SPARK-23754][PYTHON][FOLLOWUP] Move UDF stop ite...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21467#discussion_r192284083 --- Diff: python/pyspark/worker.py --- @@ -140,15 +139,18 @@ def read_single_udf(pickleSer, infile, eval_type): else: row_func = chain(row_func, f) +func = fail_on_stopiteration(row_func) + # the last returnType will be the return type of UDF if eval_type == PythonEvalType.SQL_SCALAR_PANDAS_UDF: -return arg_offsets, wrap_scalar_pandas_udf(row_func, return_type) +return arg_offsets, wrap_scalar_pandas_udf(func, return_type) elif eval_type == PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF: -return arg_offsets, wrap_grouped_map_pandas_udf(row_func, return_type) +argspec = _get_argspec(row_func) # fails on func --- End diff -- hmm, can we have more meaningful comment than `fails on func`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21467: [SPARK-23754][PYTHON][FOLLOWUP] Move UDF stop ite...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21467#discussion_r192274931 --- Diff: python/pyspark/flycheck_worker.py --- @@ -0,0 +1,314 @@ +# --- End diff -- I don't see where it is used. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21467: [SPARK-23754][PYTHON][FOLLOWUP] Move UDF stop ite...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21467#discussion_r192158213 --- Diff: python/pyspark/flycheck_worker.py --- @@ -0,0 +1,314 @@ +# --- End diff -- Seems mistakenly added ..? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21467: [SPARK-23754][PYTHON][FOLLOWUP] Move UDF stop ite...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/21467#discussion_r192119492 --- Diff: python/pyspark/worker.py --- @@ -69,6 +69,7 @@ def chain(f, g): def wrap_udf(f, return_type): +f = fail_on_stopiteration(f) --- End diff -- >move the call to get_argspec in read_single_udf Yeah I think that works --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21467: [SPARK-23754][PYTHON][FOLLOWUP] Move UDF stop ite...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/21467#discussion_r192117176 --- Diff: python/pyspark/worker.py --- @@ -69,6 +69,7 @@ def chain(f, g): def wrap_udf(f, return_type): +f = fail_on_stopiteration(f) --- End diff -- Actually, that could be problematic too. Let me think a bit. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21467: [SPARK-23754][PYTHON][FOLLOWUP] Move UDF stop ite...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/21467#discussion_r192116173 --- Diff: python/pyspark/worker.py --- @@ -69,6 +69,7 @@ def chain(f, g): def wrap_udf(f, return_type): +f = fail_on_stopiteration(f) --- End diff -- Can we wrap `fail_on_stopiteration` after `wrap*_udf`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21467: [SPARK-23754][PYTHON][FOLLOWUP] Move UDF stop ite...
Github user e-dorigatti commented on a diff in the pull request: https://github.com/apache/spark/pull/21467#discussion_r192116000 --- Diff: python/pyspark/worker.py --- @@ -69,6 +69,7 @@ def chain(f, g): def wrap_udf(f, return_type): +f = fail_on_stopiteration(f) --- End diff -- Or just move the call to `get_argspec` in `read_single_udf` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org