Github user icexelloss commented on a diff in the pull request:
https://github.com/apache/spark/pull/21467#discussion_r192114330
--- Diff: python/pyspark/sql/tests.py ---
@@ -901,20 +901,40 @@ def __call__(self, x):
self.assertEqual(return_type, f_.returnType)
def test_stopiteration_in_udf(self):
- # test for SPARK-23754
- from pyspark.sql.functions import udf
+ from pyspark.sql.functions import udf, pandas_udf, PandasUDFType
from py4j.protocol import Py4JJavaError
+ def do_test(action, *args, **kwargs):
+ with self.assertRaises(Py4JJavaError) as cm:
+ action(*args, **kwargs)
+
+ self.assertIn(
+ "Caught StopIteration thrown from user's code; failing the
task",
+ cm.exception.java_exception.toString()
+ )
+
def foo(x):
raise StopIteration()
- with self.assertRaises(Py4JJavaError) as cm:
- self.spark.range(0, 1000).withColumn('v',
udf(foo)('id')).show()
+ df = self.spark.range(0, 100)
- self.assertIn(
- "Caught StopIteration thrown from user's code; failing the
task",
- cm.exception.java_exception.toString()
- )
+ # plain udf (test for SPARK-23754)
+ do_test(df.withColumn('v', udf(foo)('id')).show)
+
+ # pandas scalar udf
+ do_test(df.withColumn(
+ 'v', pandas_udf(foo, 'double', PandasUDFType.SCALAR)('id')
+ ).show)
+
+ # pandas grouped map
+ do_test(df.groupBy('id').apply(
+ pandas_udf(foo, df.schema, PandasUDFType.GROUPED_MAP)
--- End diff --
apply.show()?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]