spark git commit: [SPARK-11870][STREAMING][PYSPARK] Rethrow the exceptions in TransformFunction and TransformFunctionSerializer
Repository: spark Updated Branches: refs/heads/branch-1.6 9c8e17984 -> 0c23dd52d [SPARK-11870][STREAMING][PYSPARK] Rethrow the exceptions in TransformFunction and TransformFunctionSerializer TransformFunction and TransformFunctionSerializer don't rethrow the exception, so when any exception happens, it just return None. This will cause some weird NPE and confuse people. Author: Shixiong ZhuCloses #9847 from zsxwing/pyspark-streaming-exception. (cherry picked from commit be7a2cfd978143f6f265eca63e9e24f755bc9f22) Signed-off-by: Tathagata Das Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0c23dd52 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0c23dd52 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0c23dd52 Branch: refs/heads/branch-1.6 Commit: 0c23dd52d64d4a3448fb7d21b0e40d13f885bcfa Parents: 9c8e179 Author: Shixiong Zhu Authored: Fri Nov 20 14:23:01 2015 -0800 Committer: Tathagata Das Committed: Fri Nov 20 14:23:18 2015 -0800 -- python/pyspark/streaming/tests.py | 16 python/pyspark/streaming/util.py | 3 +++ 2 files changed, 19 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/0c23dd52/python/pyspark/streaming/tests.py -- diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index 3403f6d..a0e0267 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -403,6 +403,22 @@ class BasicOperationTests(PySparkStreamingTestCase): expected = [[('k', v)] for v in expected] self._test_func(input, func, expected) +def test_failed_func(self): +input = [self.sc.parallelize([d], 1) for d in range(4)] +input_stream = self.ssc.queueStream(input) + +def failed_func(i): +raise ValueError("failed") + +input_stream.map(failed_func).pprint() +self.ssc.start() +try: +self.ssc.awaitTerminationOrTimeout(10) +except: +return + +self.fail("a failed func should throw an error") + class StreamingListenerTests(PySparkStreamingTestCase): http://git-wip-us.apache.org/repos/asf/spark/blob/0c23dd52/python/pyspark/streaming/util.py -- diff --git a/python/pyspark/streaming/util.py b/python/pyspark/streaming/util.py index b20613b..767c732 100644 --- a/python/pyspark/streaming/util.py +++ b/python/pyspark/streaming/util.py @@ -64,6 +64,7 @@ class TransformFunction(object): return r._jrdd except Exception: traceback.print_exc() +raise def __repr__(self): return "TransformFunction(%s)" % self.func @@ -95,6 +96,7 @@ class TransformFunctionSerializer(object): return bytearray(self.serializer.dumps((func.func, func.deserializers))) except Exception: traceback.print_exc() +raise def loads(self, data): try: @@ -102,6 +104,7 @@ class TransformFunctionSerializer(object): return TransformFunction(self.ctx, f, *deserializers) except Exception: traceback.print_exc() +raise def __repr__(self): return "TransformFunctionSerializer(%s)" % self.serializer - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-11870][STREAMING][PYSPARK] Rethrow the exceptions in TransformFunction and TransformFunctionSerializer
Repository: spark Updated Branches: refs/heads/branch-1.4 5118abb4e -> 94789f374 [SPARK-11870][STREAMING][PYSPARK] Rethrow the exceptions in TransformFunction and TransformFunctionSerializer TransformFunction and TransformFunctionSerializer don't rethrow the exception, so when any exception happens, it just return None. This will cause some weird NPE and confuse people. Author: Shixiong ZhuCloses #9847 from zsxwing/pyspark-streaming-exception. (cherry picked from commit be7a2cfd978143f6f265eca63e9e24f755bc9f22) Signed-off-by: Tathagata Das Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/94789f37 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/94789f37 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/94789f37 Branch: refs/heads/branch-1.4 Commit: 94789f37400ea534e051d1df19f3a567646979fd Parents: 5118abb Author: Shixiong Zhu Authored: Fri Nov 20 14:23:01 2015 -0800 Committer: Tathagata Das Committed: Fri Nov 20 14:23:57 2015 -0800 -- python/pyspark/streaming/tests.py | 16 python/pyspark/streaming/util.py | 3 +++ 2 files changed, 19 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/94789f37/python/pyspark/streaming/tests.py -- diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index ab1cc3f..830fd9e 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -376,6 +376,22 @@ class BasicOperationTests(PySparkStreamingTestCase): expected = [[('k', v)] for v in expected] self._test_func(input, func, expected) +def test_failed_func(self): +input = [self.sc.parallelize([d], 1) for d in range(4)] +input_stream = self.ssc.queueStream(input) + +def failed_func(i): +raise ValueError("failed") + +input_stream.map(failed_func).pprint() +self.ssc.start() +try: +self.ssc.awaitTerminationOrTimeout(10) +except: +return + +self.fail("a failed func should throw an error") + class WindowFunctionTests(PySparkStreamingTestCase): http://git-wip-us.apache.org/repos/asf/spark/blob/94789f37/python/pyspark/streaming/util.py -- diff --git a/python/pyspark/streaming/util.py b/python/pyspark/streaming/util.py index 34291f3..0a2773b 100644 --- a/python/pyspark/streaming/util.py +++ b/python/pyspark/streaming/util.py @@ -59,6 +59,7 @@ class TransformFunction(object): return r._jrdd except Exception: traceback.print_exc() +raise def __repr__(self): return "TransformFunction(%s)" % self.func @@ -90,6 +91,7 @@ class TransformFunctionSerializer(object): return bytearray(self.serializer.dumps((func.func, func.deserializers))) except Exception: traceback.print_exc() +raise def loads(self, data): try: @@ -97,6 +99,7 @@ class TransformFunctionSerializer(object): return TransformFunction(self.ctx, f, *deserializers) except Exception: traceback.print_exc() +raise def __repr__(self): return "TransformFunctionSerializer(%s)" % self.serializer - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-11870][STREAMING][PYSPARK] Rethrow the exceptions in TransformFunction and TransformFunctionSerializer
Repository: spark Updated Branches: refs/heads/branch-1.5 9a906c1c3 -> e9ae1fda9 [SPARK-11870][STREAMING][PYSPARK] Rethrow the exceptions in TransformFunction and TransformFunctionSerializer TransformFunction and TransformFunctionSerializer don't rethrow the exception, so when any exception happens, it just return None. This will cause some weird NPE and confuse people. Author: Shixiong ZhuCloses #9847 from zsxwing/pyspark-streaming-exception. (cherry picked from commit be7a2cfd978143f6f265eca63e9e24f755bc9f22) Signed-off-by: Tathagata Das Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e9ae1fda Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e9ae1fda Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e9ae1fda Branch: refs/heads/branch-1.5 Commit: e9ae1fda9e6009cf95f9a98ba130297126155e06 Parents: 9a906c1 Author: Shixiong Zhu Authored: Fri Nov 20 14:23:01 2015 -0800 Committer: Tathagata Das Committed: Fri Nov 20 14:23:38 2015 -0800 -- python/pyspark/streaming/tests.py | 16 python/pyspark/streaming/util.py | 3 +++ 2 files changed, 19 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e9ae1fda/python/pyspark/streaming/tests.py -- diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index 41f94af..63a5fd0 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -391,6 +391,22 @@ class BasicOperationTests(PySparkStreamingTestCase): expected = [[('k', v)] for v in expected] self._test_func(input, func, expected) +def test_failed_func(self): +input = [self.sc.parallelize([d], 1) for d in range(4)] +input_stream = self.ssc.queueStream(input) + +def failed_func(i): +raise ValueError("failed") + +input_stream.map(failed_func).pprint() +self.ssc.start() +try: +self.ssc.awaitTerminationOrTimeout(10) +except: +return + +self.fail("a failed func should throw an error") + class WindowFunctionTests(PySparkStreamingTestCase): http://git-wip-us.apache.org/repos/asf/spark/blob/e9ae1fda/python/pyspark/streaming/util.py -- diff --git a/python/pyspark/streaming/util.py b/python/pyspark/streaming/util.py index b20613b..767c732 100644 --- a/python/pyspark/streaming/util.py +++ b/python/pyspark/streaming/util.py @@ -64,6 +64,7 @@ class TransformFunction(object): return r._jrdd except Exception: traceback.print_exc() +raise def __repr__(self): return "TransformFunction(%s)" % self.func @@ -95,6 +96,7 @@ class TransformFunctionSerializer(object): return bytearray(self.serializer.dumps((func.func, func.deserializers))) except Exception: traceback.print_exc() +raise def loads(self, data): try: @@ -102,6 +104,7 @@ class TransformFunctionSerializer(object): return TransformFunction(self.ctx, f, *deserializers) except Exception: traceback.print_exc() +raise def __repr__(self): return "TransformFunctionSerializer(%s)" % self.serializer - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-11870][STREAMING][PYSPARK] Rethrow the exceptions in TransformFunction and TransformFunctionSerializer
Repository: spark Updated Branches: refs/heads/master 9ed4ad426 -> be7a2cfd9 [SPARK-11870][STREAMING][PYSPARK] Rethrow the exceptions in TransformFunction and TransformFunctionSerializer TransformFunction and TransformFunctionSerializer don't rethrow the exception, so when any exception happens, it just return None. This will cause some weird NPE and confuse people. Author: Shixiong ZhuCloses #9847 from zsxwing/pyspark-streaming-exception. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/be7a2cfd Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/be7a2cfd Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/be7a2cfd Branch: refs/heads/master Commit: be7a2cfd978143f6f265eca63e9e24f755bc9f22 Parents: 9ed4ad4 Author: Shixiong Zhu Authored: Fri Nov 20 14:23:01 2015 -0800 Committer: Tathagata Das Committed: Fri Nov 20 14:23:01 2015 -0800 -- python/pyspark/streaming/tests.py | 16 python/pyspark/streaming/util.py | 3 +++ 2 files changed, 19 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/be7a2cfd/python/pyspark/streaming/tests.py -- diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index 3403f6d..a0e0267 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -403,6 +403,22 @@ class BasicOperationTests(PySparkStreamingTestCase): expected = [[('k', v)] for v in expected] self._test_func(input, func, expected) +def test_failed_func(self): +input = [self.sc.parallelize([d], 1) for d in range(4)] +input_stream = self.ssc.queueStream(input) + +def failed_func(i): +raise ValueError("failed") + +input_stream.map(failed_func).pprint() +self.ssc.start() +try: +self.ssc.awaitTerminationOrTimeout(10) +except: +return + +self.fail("a failed func should throw an error") + class StreamingListenerTests(PySparkStreamingTestCase): http://git-wip-us.apache.org/repos/asf/spark/blob/be7a2cfd/python/pyspark/streaming/util.py -- diff --git a/python/pyspark/streaming/util.py b/python/pyspark/streaming/util.py index b20613b..767c732 100644 --- a/python/pyspark/streaming/util.py +++ b/python/pyspark/streaming/util.py @@ -64,6 +64,7 @@ class TransformFunction(object): return r._jrdd except Exception: traceback.print_exc() +raise def __repr__(self): return "TransformFunction(%s)" % self.func @@ -95,6 +96,7 @@ class TransformFunctionSerializer(object): return bytearray(self.serializer.dumps((func.func, func.deserializers))) except Exception: traceback.print_exc() +raise def loads(self, data): try: @@ -102,6 +104,7 @@ class TransformFunctionSerializer(object): return TransformFunction(self.ctx, f, *deserializers) except Exception: traceback.print_exc() +raise def __repr__(self): return "TransformFunctionSerializer(%s)" % self.serializer - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org