HyukjinKwon opened a new pull request, #46426:
URL: https://github.com/apache/spark/pull/46426
### What changes were proposed in this pull request?
This PR reduces traceback so the actual error `ZeroDivisionError` can be
tested in
`pyspark.sql.tests.connect.streaming.test_parity_streaming.StreamingParityTests.test_stream_exception`
### Why are the changes needed?
So long traceback doesn't affect the test case. It can fail as below:
```
======================================================================
FAIL [1.883s]: test_stream_exception
(pyspark.sql.tests.connect.streaming.test_parity_streaming.StreamingParityTests.test_stream_exception)
----------------------------------------------------------------------
Traceback (most recent call last):
File
"/home/runner/work/spark/spark-3.5/python/pyspark/sql/tests/streaming/test_streaming.py",
line 287, in test_stream_exception
sq.processAllAvailable()
File
"/home/runner/work/spark/spark-3.5/python/pyspark/sql/connect/streaming/query.py",
line 129, in processAllAvailable
self._execute_streaming_query_cmd(cmd)
File
"/home/runner/work/spark/spark-3.5/python/pyspark/sql/connect/streaming/query.py",
line 177, in _execute_streaming_query_cmd
(_, properties) = self._session.client.execute_command(exec_cmd)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/runner/work/spark/spark-3.5/python/pyspark/sql/connect/client/core.py",
line 982, in execute_command
data, _, _, _, properties = self._execute_and_fetch(req)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/runner/work/spark/spark-3.5/python/pyspark/sql/connect/client/core.py",
line 1283, in _execute_and_fetch
for response in self._execute_and_fetch_as_iterator(req):
File
"/home/runner/work/spark/spark-3.5/python/pyspark/sql/connect/client/core.py",
line 1264, in _execute_and_fetch_as_iterator
self._handle_error(error)
File
"/home/runner/work/spark/spark-3.5/python/pyspark/sql/connect/client/core.py",
line
[150](https://github.com/HyukjinKwon/spark/actions/runs/8978991632/job/24660689666#step:9:151)3,
in _handle_error
self._handle_rpc_error(error)
File
"/home/runner/work/spark/spark-3.5/python/pyspark/sql/connect/client/core.py",
line 1539, in _handle_rpc_error
raise convert_exception(info, status.message) from None
pyspark.errors.exceptions.connect.StreamingQueryException: [STREAM_FAILED]
Query [id = 1c0c440d-0b48-41b1-9a03-071e7e13de82, runId =
692ec338-963a-43b1-89cb-2a8b7cb1e21a] terminated with exception: Job aborted
due to stage failure: Task 0 in stage 39.0 failed 1 times, most recent failure:
Lost task 0.0 in stage 39.0 (TID 58)
(fv-az714-234.22nzjvkrszmuhkvqy55p1tioig.phxx.internal.cloudapp.net executor
driver): org.apache.spark.api.python.PythonException: Traceback (most recent
call last):
File
"/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line
1834, in main
process()
File
"/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line
1826, in process
serializer.dump_stream(out_iter, outfile)
File
"/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/serializers.py",
line 224, in dump_stream
self.serializer.dump_stream(self._batched(iterator), stream)
File
"/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/serializers.py",
line 145, in dump_stream
for obj in iterator:
File
"/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/serializers.py",
line 213, in _batched
for item in iterator:
File
"/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line
1734, in mapper
result = tuple(f(*[a[o] for o in arg_offsets]) for arg_offsets, f in
udfs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line
1734, in <genexpr>
result = tuple(f(*[a[o] for o in arg_offsets]) for arg_offsets, f in
udfs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line
112, in <lambda>
return args_kwargs_offsets, lambda *a: func(*a)
^^^^^^^^
File
"/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/util.py", line
134, in wrapper
return f(*args, **kwargs)
^^^^^^^^^^^^^^^^^^
File "/home/runner/work/spark/spark-3....
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File
"/home/runner/work/spark/spark-3.5/python/pyspark/sql/tests/streaming/test_streaming.py",
line 291, in test_stream_exception
self._assert_exception_tree_contains_msg(e, "ZeroDivisionError")
File
"/home/runner/work/spark/spark-3.5/python/pyspark/sql/tests/streaming/test_streaming.py",
line 300, in _assert_exception_tree_contains_msg
self._assert_exception_tree_contains_msg_connect(exception, msg)
File
"/home/runner/work/spark/spark-3.5/python/pyspark/sql/tests/streaming/test_streaming.py",
line 305, in _assert_exception_tree_contains_msg_connect
self.assertTrue(
AssertionError: False is not true : Exception tree doesn't contain the
expected message: ZeroDivisionError
----------------------------------------------------------------------
```
### Does this PR introduce _any_ user-facing change?
No, test-only.
### How was this patch tested?
Tested in my own fork:
https://github.com/HyukjinKwon/spark/actions/runs/8978991632
### Was this patch authored or co-authored using generative AI tooling?
No.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]