jingz-db commented on code in PR #47819:
URL: https://github.com/apache/spark/pull/47819#discussion_r1746189623
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterSuite.scala:
##########
@@ -157,6 +164,23 @@ class ForeachWriterSuite extends StreamTest with
SharedSparkSession with BeforeA
assert(errorEvent.error.get.getMessage === "ForeachSinkSuite error")
// 'close' shouldn't be called with abort message if close with error
has been called
assert(allEvents(0).size == 3)
+
+ val sparkEx = ExecutorDeadException("network error")
+ val e2 = intercept[StreamingQueryException] {
+ val query2 = input.toDS().repartition(1).writeStream
+ .foreach(new TestForeachWriter() {
+ override def process(value: Int): Unit = {
+ super.process(value)
+ throw sparkEx
+ }
+ }).start()
+ query2.processAllAvailable()
+ }
+
+ // we didn't wrap the spark exception
+ assert(!e2.getMessage.contains(errClass))
+
assert(e2.getCause.getCause.asInstanceOf[ExecutorDeadException].getMessage
Review Comment:
The stack trace for `e2` is:
```
org.apache.spark.sql.streaming.StreamingQueryException: Job aborted due to
stage failure: Task 0 in stage 11.0 failed 1 times, most recent failure: Lost
task 0.0 in stage 11.0 (TID 13) (ip-10-110-191-161.us-west-2.compute.internal
executor driver): org.apache.spark.ExecutorDeadException:
[INTERNAL_ERROR_NETWORK] network error SQLSTATE: XX000
at
org.apache.spark.sql.execution.streaming.sources.ForeachWriterSuite.$anonfun$new$15(ForeachWriterSuite.scala:168)
at
org.apache.spark.sql.execution.streaming.sources.ForeachWriterSuite.$anonfun$new$15$adapted(ForeachWriterSuite.scala:129)
...
```
And `e2.getCause` is:
```
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in
stage 11.0 failed 1 times, most recent failure: Lost task 0.0 in stage 11.0
(TID 13) (ip-10-110-191-161.us-west-2.compute.internal executor driver):
org.apache.spark.ExecutorDeadException: [INTERNAL_ERROR_NETWORK] network error
SQLSTATE: XX000
at
org.apache.spark.sql.execution.streaming.sources.ForeachWriterSuite.$anonfun$new$15(ForeachWriterSuite.scala:168)
at
org.apache.spark.sql.execution.streaming.sources.ForeachWriterSuite.$anonfun$new$15$adapted(ForeachWriterSuite.scala:129)
...
```
Looks like before my change, all forEach exception will already have this
extra one level of `SparkException`:
https://github.com/apache/spark/pull/47819/files#diff-6ed6f67f5a52591e4293b31b509c6c3a9007070ddd4a89e4907a2c9865ca1970L146.
So this behavior is preserved in my change.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]