[GitHub] spark pull request #21467: [SPARK-23754][PYTHON][FOLLOWUP] Move UDF stop ite...

2018-06-10 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/21467


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21467: [SPARK-23754][PYTHON][FOLLOWUP] Move UDF stop ite...

2018-06-08 Thread e-dorigatti
Github user e-dorigatti commented on a diff in the pull request:

https://github.com/apache/spark/pull/21467#discussion_r194041622
  
--- Diff: python/pyspark/worker.py ---
@@ -140,15 +139,18 @@ def read_single_udf(pickleSer, infile, eval_type):
 else:
 row_func = chain(row_func, f)
 
+func = fail_on_stopiteration(row_func)
--- End diff --

Clearer?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21467: [SPARK-23754][PYTHON][FOLLOWUP] Move UDF stop ite...

2018-06-06 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21467#discussion_r193307267
  
--- Diff: python/pyspark/tests.py ---
@@ -1291,27 +1291,31 @@ def test_pipe_unicode(self):
 result = rdd.pipe('cat').collect()
 self.assertEqual(data, result)
 
-def test_stopiteration_in_client_code(self):
+def test_stopiteration_in_user_code(self):
 
 def stopit(*x):
 raise StopIteration()
 
 seq_rdd = self.sc.parallelize(range(10))
 keyed_rdd = self.sc.parallelize((x % 2, x) for x in range(10))
+msg = "Caught StopIteration thrown from user's code; failing the 
task"
 
-self.assertRaises(Py4JJavaError, seq_rdd.map(stopit).collect)
-self.assertRaises(Py4JJavaError, seq_rdd.filter(stopit).collect)
-self.assertRaises(Py4JJavaError, 
seq_rdd.cartesian(seq_rdd).flatMap(stopit).collect)
-self.assertRaises(Py4JJavaError, seq_rdd.foreach, stopit)
-self.assertRaises(Py4JJavaError, keyed_rdd.reduceByKeyLocally, 
stopit)
-self.assertRaises(Py4JJavaError, seq_rdd.reduce, stopit)
-self.assertRaises(Py4JJavaError, seq_rdd.fold, 0, stopit)
+self.assertRaisesRegexp(Py4JJavaError, msg, 
seq_rdd.map(stopit).collect)
+self.assertRaisesRegexp(Py4JJavaError, msg, 
seq_rdd.filter(stopit).collect)
+self.assertRaisesRegexp(Py4JJavaError, msg, seq_rdd.foreach, 
stopit)
+self.assertRaisesRegexp(Py4JJavaError, msg, seq_rdd.reduce, stopit)
+self.assertRaisesRegexp(Py4JJavaError, msg, seq_rdd.fold, 0, 
stopit)
+self.assertRaisesRegexp(Py4JJavaError, msg, seq_rdd.foreach, 
stopit)
+self.assertRaisesRegexp(Py4JJavaError, msg,
+
seq_rdd.cartesian(seq_rdd).flatMap(stopit).collect)
 
 # the exception raised is non-deterministic
--- End diff --

Yea, I asked this before. He explained that the exception can be thrown in 
driver side or executor side too non-deterministically. We should clarify this 
comment. It's quite core fix. Let's clarify everything.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21467: [SPARK-23754][PYTHON][FOLLOWUP] Move UDF stop ite...

2018-06-06 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/21467#discussion_r193306421
  
--- Diff: python/pyspark/tests.py ---
@@ -1291,27 +1291,31 @@ def test_pipe_unicode(self):
 result = rdd.pipe('cat').collect()
 self.assertEqual(data, result)
 
-def test_stopiteration_in_client_code(self):
+def test_stopiteration_in_user_code(self):
 
 def stopit(*x):
 raise StopIteration()
 
 seq_rdd = self.sc.parallelize(range(10))
 keyed_rdd = self.sc.parallelize((x % 2, x) for x in range(10))
+msg = "Caught StopIteration thrown from user's code; failing the 
task"
 
-self.assertRaises(Py4JJavaError, seq_rdd.map(stopit).collect)
-self.assertRaises(Py4JJavaError, seq_rdd.filter(stopit).collect)
-self.assertRaises(Py4JJavaError, 
seq_rdd.cartesian(seq_rdd).flatMap(stopit).collect)
-self.assertRaises(Py4JJavaError, seq_rdd.foreach, stopit)
-self.assertRaises(Py4JJavaError, keyed_rdd.reduceByKeyLocally, 
stopit)
-self.assertRaises(Py4JJavaError, seq_rdd.reduce, stopit)
-self.assertRaises(Py4JJavaError, seq_rdd.fold, 0, stopit)
+self.assertRaisesRegexp(Py4JJavaError, msg, 
seq_rdd.map(stopit).collect)
+self.assertRaisesRegexp(Py4JJavaError, msg, 
seq_rdd.filter(stopit).collect)
+self.assertRaisesRegexp(Py4JJavaError, msg, seq_rdd.foreach, 
stopit)
+self.assertRaisesRegexp(Py4JJavaError, msg, seq_rdd.reduce, stopit)
+self.assertRaisesRegexp(Py4JJavaError, msg, seq_rdd.fold, 0, 
stopit)
+self.assertRaisesRegexp(Py4JJavaError, msg, seq_rdd.foreach, 
stopit)
+self.assertRaisesRegexp(Py4JJavaError, msg,
+
seq_rdd.cartesian(seq_rdd).flatMap(stopit).collect)
 
 # the exception raised is non-deterministic
--- End diff --

What does this mean? The exception is non-deterministic?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21467: [SPARK-23754][PYTHON][FOLLOWUP] Move UDF stop ite...

2018-06-06 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21467#discussion_r193302276
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -4096,6 +4080,43 @@ def foo(df):
 def foo(k, v, w):
 return k
 
+def test_stopiteration_in_udf(self):
+from pyspark.sql.functions import udf, pandas_udf, PandasUDFType
+from py4j.protocol import Py4JJavaError
+
+def foo(x):
+raise StopIteration()
+
+def foofoo(x, y):
+raise StopIteration()
+
+exc_message = "Caught StopIteration thrown from user's code; 
failing the task"
+df = self.spark.range(0, 100)
+
+# plain udf (test for SPARK-23754)
+self.assertRaisesRegexp(Py4JJavaError, exc_message, df.withColumn(
+'v', udf(foo)('id')
+).collect)
--- End diff --

tiny nit: I would do:

```
self.assertRaisesRegexp(
Py4JJavaError, exc_message, df.withColumn('v', udf(foo)('id')).collect)
```

or

```
self.assertRaisesRegexp(
Py4JJavaError,
exc_message,
df.withColumn('v', udf(foo)('id')).collect)
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21467: [SPARK-23754][PYTHON][FOLLOWUP] Move UDF stop ite...

2018-06-01 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/21467#discussion_r192539912
  
--- Diff: python/pyspark/util.py ---
@@ -53,16 +53,11 @@ def _get_argspec(f):
 """
 Get argspec of a function. Supports both Python 2 and Python 3.
 """
-
-if hasattr(f, '_argspec'):
-# only used for pandas UDF: they wrap the user function, losing 
its signature
-# workers need this signature, so UDF saves it here
-argspec = f._argspec
-elif sys.version_info[0] < 3:
+# `getargspec` is deprecated since python3.0 (incompatible with 
function annotations).
--- End diff --

yea, I think the comment is for the else block.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21467: [SPARK-23754][PYTHON][FOLLOWUP] Move UDF stop ite...

2018-06-01 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/21467#discussion_r192513260
  
--- Diff: python/pyspark/util.py ---
@@ -53,16 +53,11 @@ def _get_argspec(f):
 """
 Get argspec of a function. Supports both Python 2 and Python 3.
 """
-
-if hasattr(f, '_argspec'):
-# only used for pandas UDF: they wrap the user function, losing 
its signature
-# workers need this signature, so UDF saves it here
-argspec = f._argspec
-elif sys.version_info[0] < 3:
+# `getargspec` is deprecated since python3.0 (incompatible with 
function annotations).
--- End diff --

I meant. The comment itself can be moved back to the "else" block? (This is 
minor though)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21467: [SPARK-23754][PYTHON][FOLLOWUP] Move UDF stop ite...

2018-06-01 Thread e-dorigatti
Github user e-dorigatti commented on a diff in the pull request:

https://github.com/apache/spark/pull/21467#discussion_r192502611
  
--- Diff: python/pyspark/util.py ---
@@ -53,16 +53,11 @@ def _get_argspec(f):
 """
 Get argspec of a function. Supports both Python 2 and Python 3.
 """
-
-if hasattr(f, '_argspec'):
-# only used for pandas UDF: they wrap the user function, losing 
its signature
-# workers need this signature, so UDF saves it here
-argspec = f._argspec
-elif sys.version_info[0] < 3:
+# `getargspec` is deprecated since python3.0 (incompatible with 
function annotations).
--- End diff --

no, this is the purpose of this PR :) that's how we fixed a bug in [a 
previous PR](https://github.com/apache/spark/pull/21383), but we felt it was a 
hack so now we are doing it properly


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21467: [SPARK-23754][PYTHON][FOLLOWUP] Move UDF stop ite...

2018-06-01 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/21467#discussion_r192493299
  
--- Diff: python/pyspark/util.py ---
@@ -53,16 +53,11 @@ def _get_argspec(f):
 """
 Get argspec of a function. Supports both Python 2 and Python 3.
 """
-
-if hasattr(f, '_argspec'):
-# only used for pandas UDF: they wrap the user function, losing 
its signature
-# workers need this signature, so UDF saves it here
-argspec = f._argspec
-elif sys.version_info[0] < 3:
+# `getargspec` is deprecated since python3.0 (incompatible with 
function annotations).
--- End diff --

This change doesn't seems necessary... Let's move it back?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21467: [SPARK-23754][PYTHON][FOLLOWUP] Move UDF stop ite...

2018-06-01 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21467#discussion_r192456717
  
--- Diff: python/pyspark/worker.py ---
@@ -140,15 +139,20 @@ def read_single_udf(pickleSer, infile, eval_type):
 else:
 row_func = chain(row_func, f)
 
+# make sure StopIteration's raised in the user code are not
+# ignored, but re-raised as RuntimeError's
+func = fail_on_stopiteration(row_func)
--- End diff --

Ah, sure.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21467: [SPARK-23754][PYTHON][FOLLOWUP] Move UDF stop ite...

2018-06-01 Thread e-dorigatti
Github user e-dorigatti commented on a diff in the pull request:

https://github.com/apache/spark/pull/21467#discussion_r192455582
  
--- Diff: python/pyspark/worker.py ---
@@ -140,15 +139,20 @@ def read_single_udf(pickleSer, infile, eval_type):
 else:
 row_func = chain(row_func, f)
 
+# make sure StopIteration's raised in the user code are not
+# ignored, but re-raised as RuntimeError's
+func = fail_on_stopiteration(row_func)
--- End diff --

I wanted to avoid the overhead of calling get_argspec even when it's not 
needed


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21467: [SPARK-23754][PYTHON][FOLLOWUP] Move UDF stop ite...

2018-06-01 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21467#discussion_r192437788
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -901,20 +901,43 @@ def __call__(self, x):
 self.assertEqual(return_type, f_.returnType)
 
 def test_stopiteration_in_udf(self):
-# test for SPARK-23754
-from pyspark.sql.functions import udf
+from pyspark.sql.functions import udf, pandas_udf, PandasUDFType
 from py4j.protocol import Py4JJavaError
 
+def do_test(action, *args, **kwargs):
+exc_message = "Caught StopIteration thrown from user's code; 
failing the task"
+with self.assertRaisesRegexp(Py4JJavaError, exc_message) as cm:
+action(*args, **kwargs)
+
 def foo(x):
 raise StopIteration()
 
-with self.assertRaises(Py4JJavaError) as cm:
-self.spark.range(0, 1000).withColumn('v', 
udf(foo)('id')).show()
+def foofoo(x, y):
+raise StopIteration()
 
-self.assertIn(
-"Caught StopIteration thrown from user's code; failing the 
task",
-cm.exception.java_exception.toString()
-)
+df = self.spark.range(0, 100)
+
+# plain udf (test for SPARK-23754)
+do_test(df.withColumn('v', udf(foo)('id')).show)
+
+# pandas scalar udf
+do_test(df.withColumn(
+'v', pandas_udf(foo, 'double', PandasUDFType.SCALAR)('id')
--- End diff --

Oh, actually let's put them in `PandasUDFTests`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21467: [SPARK-23754][PYTHON][FOLLOWUP] Move UDF stop ite...

2018-06-01 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21467#discussion_r192437180
  
--- Diff: python/pyspark/worker.py ---
@@ -140,15 +139,20 @@ def read_single_udf(pickleSer, infile, eval_type):
 else:
 row_func = chain(row_func, f)
 
+# make sure StopIteration's raised in the user code are not
+# ignored, but re-raised as RuntimeError's
+func = fail_on_stopiteration(row_func)
--- End diff --

I think `row_func` name was fine. Let's just leave it as was.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21467: [SPARK-23754][PYTHON][FOLLOWUP] Move UDF stop ite...

2018-06-01 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21467#discussion_r192436670
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -901,20 +901,43 @@ def __call__(self, x):
 self.assertEqual(return_type, f_.returnType)
 
 def test_stopiteration_in_udf(self):
-# test for SPARK-23754
-from pyspark.sql.functions import udf
+from pyspark.sql.functions import udf, pandas_udf, PandasUDFType
 from py4j.protocol import Py4JJavaError
 
+def do_test(action, *args, **kwargs):
--- End diff --

Likewise, let's just do

```python
exc_message = "Caught StopIteration thrown from user's code; failing the 
task"

with self.assertRaisesRegexp(Py4JJavaError, exc_message):
... pandas_udf ...
```

I think it doesn't particularly reduce the codes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21467: [SPARK-23754][PYTHON][FOLLOWUP] Move UDF stop ite...

2018-06-01 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21467#discussion_r192436233
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -901,20 +901,43 @@ def __call__(self, x):
 self.assertEqual(return_type, f_.returnType)
 
 def test_stopiteration_in_udf(self):
-# test for SPARK-23754
-from pyspark.sql.functions import udf
+from pyspark.sql.functions import udf, pandas_udf, PandasUDFType
 from py4j.protocol import Py4JJavaError
 
+def do_test(action, *args, **kwargs):
+exc_message = "Caught StopIteration thrown from user's code; 
failing the task"
+with self.assertRaisesRegexp(Py4JJavaError, exc_message) as cm:
+action(*args, **kwargs)
+
 def foo(x):
 raise StopIteration()
 
-with self.assertRaises(Py4JJavaError) as cm:
-self.spark.range(0, 1000).withColumn('v', 
udf(foo)('id')).show()
+def foofoo(x, y):
+raise StopIteration()
 
-self.assertIn(
-"Caught StopIteration thrown from user's code; failing the 
task",
-cm.exception.java_exception.toString()
-)
+df = self.spark.range(0, 100)
+
+# plain udf (test for SPARK-23754)
+do_test(df.withColumn('v', udf(foo)('id')).show)
+
+# pandas scalar udf
+do_test(df.withColumn(
+'v', pandas_udf(foo, 'double', PandasUDFType.SCALAR)('id')
--- End diff --

Ah, in the Jenkins, we run this tests via Python 2, Python 3 and PyPy. 
Python 2 and PyPy don't have PyArrow installed, so it throws an exception. It 
should be placed in `*PandasUDFTests` for each. For example, this is a Scalar 
Pandas UDF which should be placed in `ScalarPandasUDFTests`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21467: [SPARK-23754][PYTHON][FOLLOWUP] Move UDF stop ite...

2018-06-01 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/21467#discussion_r192424377
  
--- Diff: python/pyspark/tests.py ---
@@ -1303,11 +1303,13 @@ def stopit(*x):
 self.assertRaises(Py4JJavaError, seq_rdd.filter(stopit).collect)
 self.assertRaises(Py4JJavaError, 
seq_rdd.cartesian(seq_rdd).flatMap(stopit).collect)
 self.assertRaises(Py4JJavaError, seq_rdd.foreach, stopit)
-self.assertRaises(Py4JJavaError, keyed_rdd.reduceByKeyLocally, 
stopit)
 self.assertRaises(Py4JJavaError, seq_rdd.reduce, stopit)
 self.assertRaises(Py4JJavaError, seq_rdd.fold, 0, stopit)
+self.assertRaises(Py4JJavaError, seq_rdd.foreach, stopit)
--- End diff --

Sorry for the late comment, can you also change these tests to check error 
message using assertRaisesRegexp?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21467: [SPARK-23754][PYTHON][FOLLOWUP] Move UDF stop ite...

2018-05-31 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/21467#discussion_r192279606
  
--- Diff: python/pyspark/worker.py ---
@@ -140,15 +139,18 @@ def read_single_udf(pickleSer, infile, eval_type):
 else:
 row_func = chain(row_func, f)
 
+func = fail_on_stopiteration(row_func)
--- End diff --

Add few comments for it?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21467: [SPARK-23754][PYTHON][FOLLOWUP] Move UDF stop ite...

2018-05-31 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/21467#discussion_r192284083
  
--- Diff: python/pyspark/worker.py ---
@@ -140,15 +139,18 @@ def read_single_udf(pickleSer, infile, eval_type):
 else:
 row_func = chain(row_func, f)
 
+func = fail_on_stopiteration(row_func)
+
 # the last returnType will be the return type of UDF
 if eval_type == PythonEvalType.SQL_SCALAR_PANDAS_UDF:
-return arg_offsets, wrap_scalar_pandas_udf(row_func, return_type)
+return arg_offsets, wrap_scalar_pandas_udf(func, return_type)
 elif eval_type == PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF:
-return arg_offsets, wrap_grouped_map_pandas_udf(row_func, 
return_type)
+argspec = _get_argspec(row_func)  # fails on func
--- End diff --

hmm, can we have more meaningful comment than `fails on func`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21467: [SPARK-23754][PYTHON][FOLLOWUP] Move UDF stop ite...

2018-05-31 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/21467#discussion_r192274931
  
--- Diff: python/pyspark/flycheck_worker.py ---
@@ -0,0 +1,314 @@
+#
--- End diff --

I don't see where it is used.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21467: [SPARK-23754][PYTHON][FOLLOWUP] Move UDF stop ite...

2018-05-31 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21467#discussion_r192158213
  
--- Diff: python/pyspark/flycheck_worker.py ---
@@ -0,0 +1,314 @@
+#
--- End diff --

Seems mistakenly added ..?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21467: [SPARK-23754][PYTHON][FOLLOWUP] Move UDF stop ite...

2018-05-31 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/21467#discussion_r192119492
  
--- Diff: python/pyspark/worker.py ---
@@ -69,6 +69,7 @@ def chain(f, g):
 
 
 def wrap_udf(f, return_type):
+f = fail_on_stopiteration(f)
--- End diff --

>move the call to get_argspec in read_single_udf

Yeah I think that works


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21467: [SPARK-23754][PYTHON][FOLLOWUP] Move UDF stop ite...

2018-05-31 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/21467#discussion_r192117176
  
--- Diff: python/pyspark/worker.py ---
@@ -69,6 +69,7 @@ def chain(f, g):
 
 
 def wrap_udf(f, return_type):
+f = fail_on_stopiteration(f)
--- End diff --

Actually, that could be problematic too. Let me think a bit.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21467: [SPARK-23754][PYTHON][FOLLOWUP] Move UDF stop ite...

2018-05-31 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/21467#discussion_r192116173
  
--- Diff: python/pyspark/worker.py ---
@@ -69,6 +69,7 @@ def chain(f, g):
 
 
 def wrap_udf(f, return_type):
+f = fail_on_stopiteration(f)
--- End diff --

Can we wrap `fail_on_stopiteration` after `wrap*_udf`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21467: [SPARK-23754][PYTHON][FOLLOWUP] Move UDF stop ite...

2018-05-31 Thread e-dorigatti
Github user e-dorigatti commented on a diff in the pull request:

https://github.com/apache/spark/pull/21467#discussion_r192116000
  
--- Diff: python/pyspark/worker.py ---
@@ -69,6 +69,7 @@ def chain(f, g):
 
 
 def wrap_udf(f, return_type):
+f = fail_on_stopiteration(f)
--- End diff --

Or just move the call to `get_argspec` in `read_single_udf`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org