Github user HyukjinKwon commented on a diff in the pull request:
https://github.com/apache/spark/pull/21467#discussion_r193307267
--- Diff: python/pyspark/tests.py ---
@@ -1291,27 +1291,31 @@ def test_pipe_unicode(self):
result = rdd.pipe('cat').collect()
self.assertEqual(data, result)
- def test_stopiteration_in_client_code(self):
+ def test_stopiteration_in_user_code(self):
def stopit(*x):
raise StopIteration()
seq_rdd = self.sc.parallelize(range(10))
keyed_rdd = self.sc.parallelize((x % 2, x) for x in range(10))
+ msg = "Caught StopIteration thrown from user's code; failing the
task"
- self.assertRaises(Py4JJavaError, seq_rdd.map(stopit).collect)
- self.assertRaises(Py4JJavaError, seq_rdd.filter(stopit).collect)
- self.assertRaises(Py4JJavaError,
seq_rdd.cartesian(seq_rdd).flatMap(stopit).collect)
- self.assertRaises(Py4JJavaError, seq_rdd.foreach, stopit)
- self.assertRaises(Py4JJavaError, keyed_rdd.reduceByKeyLocally,
stopit)
- self.assertRaises(Py4JJavaError, seq_rdd.reduce, stopit)
- self.assertRaises(Py4JJavaError, seq_rdd.fold, 0, stopit)
+ self.assertRaisesRegexp(Py4JJavaError, msg,
seq_rdd.map(stopit).collect)
+ self.assertRaisesRegexp(Py4JJavaError, msg,
seq_rdd.filter(stopit).collect)
+ self.assertRaisesRegexp(Py4JJavaError, msg, seq_rdd.foreach,
stopit)
+ self.assertRaisesRegexp(Py4JJavaError, msg, seq_rdd.reduce, stopit)
+ self.assertRaisesRegexp(Py4JJavaError, msg, seq_rdd.fold, 0,
stopit)
+ self.assertRaisesRegexp(Py4JJavaError, msg, seq_rdd.foreach,
stopit)
+ self.assertRaisesRegexp(Py4JJavaError, msg,
+
seq_rdd.cartesian(seq_rdd).flatMap(stopit).collect)
# the exception raised is non-deterministic
--- End diff --
Yea, I asked this before. He explained that the exception can be thrown in
driver side or executor side too non-deterministically. We should clarify this
comment. It's quite core fix. Let's clarify everything.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]