gaogaotiantian commented on code in PR #53099:
URL: https://github.com/apache/spark/pull/53099#discussion_r2653901333
##########
python/pyspark/sql/tests/streaming/test_streaming_foreach_batch.py:
##########
@@ -81,8 +81,6 @@ def collectBatch(df, id):
except StreamingQueryException as e:
err_msg = str(e)
self.assertTrue("this should fail" in err_msg)
- # check for foreachBatch error class
- self.assertTrue("FOREACH_BATCH_USER_FUNCTION_ERROR" in err_msg)
Review Comment:
Before:
```
[STREAM_FAILED] Query [id = fd010fca-0598-47fc-91ea-a9cf8f8324f4, runId =
43b7d8d3-1df0-4bf8-b5ec-229d95c1819a] terminated with exception:
[FOREACH_USER_FUNCTION_ERROR] An error occurred in the user provided function
in foreach sink. Reason: Traceback (most recent call last):
File
"/Users/tian.gao/programs/spark/python/lib/pyspark.zip/pyspark/worker.py", line
3511, in main
process()
File
"/Users/tian.gao/programs/spark/python/lib/pyspark.zip/pyspark/worker.py", line
3500, in process
out_iter = func(split_index, iterator)
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/Users/tian.gao/programs/spark/python/pyspark/sql/streaming/readwriter.py",
line 1435, in func_with_open_process_close
raise error
File
"/Users/tian.gao/programs/spark/python/pyspark/sql/streaming/readwriter.py",
line 1428, in func_with_open_process_close
f.process(x)
File
"/Users/tian.gao/programs/spark/python/pyspark/sql/tests/streaming/test_streaming_foreach.py",
line 215, in process
raise RuntimeError("test error")
RuntimeError: test error
SQLSTATE: 39000 SQLSTATE: XXKST
=== Streaming Query ===
Identifier: [id = fd010fca-0598-47fc-91ea-a9cf8f8324f4, runId =
43b7d8d3-1df0-4bf8-b5ec-229d95c1819a]
Current Committed Offsets: {}
Current Available Offsets:
{FileStreamSource[file:/Users/tian.gao/programs/spark/python/target/af2fbd89-63cb-442d-a3fd-0347beb42891/tmptzii6xhz]:
{"logOffset":0}}
Current State: ACTIVE
Thread State: RUNNABLE
Logical Plan:
~WriteToMicroBatchDataSource
ForeachWriterTable(org.apache.spark.sql.execution.python.streaming.PythonForeachWriter@4de70c1c,Right(org.apache.spark.sql.execution.streaming.sources.ForeachWriterTable$$$Lambda$1911/0x000000c001bae228@43c4937d)),
fd010fca-0598-47fc-91ea-a9cf8f8324f4, Append
+- ~StreamingExecutionRelation
FileStreamSource[file:/Users/tian.gao/programs/spark/python/target/af2fbd89-63cb-442d-a3fd-0347beb42891/tmptzii6xhz],
[value#2]
JVM stacktrace:
org.apache.spark.sql.streaming.StreamingQueryException: Traceback (most
recent call last):
File
"/Users/tian.gao/programs/spark/python/lib/pyspark.zip/pyspark/worker.py", line
3511, in main
process()
File
"/Users/tian.gao/programs/spark/python/lib/pyspark.zip/pyspark/worker.py", line
3500, in process
out_iter = func(split_index, iterator)
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/Users/tian.gao/programs/spark/python/pyspark/sql/streaming/readwriter.py",
line 1435, in func_with_open_process_close
raise error
File
"/Users/tian.gao/programs/spark/python/pyspark/sql/streaming/readwriter.py",
line 1428, in func_with_open_process_close
f.process(x)
File
"/Users/tian.gao/programs/spark/python/pyspark/sql/tests/streaming/test_streaming_foreach.py",
line 215, in process
raise RuntimeError("test error")
RuntimeError: test error
=== Streaming Query ===
Identifier: [id = fd010fca-0598-47fc-91ea-a9cf8f8324f4, runId =
43b7d8d3-1df0-4bf8-b5ec-229d95c1819a]
Current Committed Offsets: {}
Current Available Offsets:
{FileStreamSource[file:/Users/tian.gao/programs/spark/python/target/af2fbd89-63cb-442d-a3fd-0347beb42891/tmptzii6xhz]:
{"logOffset":0}}
Current State: ACTIVE
Thread State: RUNNABLE
Logical Plan:
~WriteToMicroBatchDataSource
ForeachWriterTable(org.apache.spark.sql.execution.python.streaming.PythonForeachWriter@4de70c1c,Right(org.apache.spark.sql.execution.streaming.sources.ForeachWriterTable$$$Lambda$1911/0x000000c001bae228@43c4937d)),
fd010fca-0598-47fc-91ea-a9cf8f8324f4, Append
+- ~StreamingExecutionRelation
FileStreamSource[file:/Users/tian.gao/programs/spark/python/target/af2fbd89-63cb-442d-a3fd-0347beb42891/tmptzii6xhz],
[value#2]
at
org.apache.spark.sql.execution.streaming.runtime.StreamExecution.org$apache$spark$sql$execution$streaming$runtime$StreamExecution$$runStream(StreamExecution.scala:390)
at
org.apache.spark.sql.execution.streaming.runtime.StreamExecution$$anon$1.run(StreamExecution.scala:236)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most
recent call last):
File
"/Users/tian.gao/programs/spark/python/lib/pyspark.zip/pyspark/worker.py", line
3511, in main
process()
File
"/Users/tian.gao/programs/spark/python/lib/pyspark.zip/pyspark/worker.py", line
3500, in process
out_iter = func(split_index, iterator)
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/Users/tian.gao/programs/spark/python/pyspark/sql/streaming/readwriter.py",
line 1435, in func_with_open_process_close
raise error
File
"/Users/tian.gao/programs/spark/python/pyspark/sql/streaming/readwriter.py",
line 1428, in func_with_open_process_close
f.process(x)
File
"/Users/tian.gao/programs/spark/python/pyspark/sql/tests/streaming/test_streaming_foreach.py",
line 215, in process
raise RuntimeError("test error")
RuntimeError: test error
at
org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:691)
at
org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:1082)
at
org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:1064)
at
org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:631)
at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at
org.apache.spark.sql.execution.python.streaming.WriterThread.run(PythonForeachWriter.scala:54)
```
After:
```
[STREAM_FAILED] Query [id = f43afccb-7f65-4e8a-9a6e-dd04139ed108, runId =
5496dd83-3cd5-404a-9265-ba647031c964] terminated with exception:
[PYTHON_EXCEPTION] An exception was thrown from the Python worker: Traceback
(most recent call last):
File
"/Users/tian.gao/programs/spark/python/lib/pyspark.zip/pyspark/worker.py", line
3511, in main
process()
File
"/Users/tian.gao/programs/spark/python/lib/pyspark.zip/pyspark/worker.py", line
3500, in process
out_iter = func(split_index, iterator)
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/Users/tian.gao/programs/spark/python/pyspark/sql/streaming/readwriter.py",
line 1435, in func_with_open_process_close
raise error
File
"/Users/tian.gao/programs/spark/python/pyspark/sql/streaming/readwriter.py",
line 1428, in func_with_open_process_close
f.process(x)
File
"/Users/tian.gao/programs/spark/python/pyspark/sql/tests/streaming/test_streaming_foreach.py",
line 215, in process
raise RuntimeError("test error")
RuntimeError: test error
SQLSTATE: 38000 SQLSTATE: XXKST
=== Streaming Query ===
Identifier: [id = f43afccb-7f65-4e8a-9a6e-dd04139ed108, runId =
5496dd83-3cd5-404a-9265-ba647031c964]
Current Committed Offsets: {}
Current Available Offsets:
{FileStreamSource[file:/Users/tian.gao/programs/spark/python/target/7c5a8445-75c8-4c7e-8868-e51306016d62/tmpp1v7krfk]:
{"logOffset":0}}
Current State: ACTIVE
Thread State: RUNNABLE
Logical Plan:
~WriteToMicroBatchDataSource
ForeachWriterTable(org.apache.spark.sql.execution.python.streaming.PythonForeachWriter@30111fd7,Right(org.apache.spark.sql.execution.streaming.sources.ForeachWriterTable$$$Lambda$1911/0x000000b801bad578@603a8a1c)),
f43afccb-7f65-4e8a-9a6e-dd04139ed108, Append
+- ~StreamingExecutionRelation
FileStreamSource[file:/Users/tian.gao/programs/spark/python/target/7c5a8445-75c8-4c7e-8868-e51306016d62/tmpp1v7krfk],
[value#2]
JVM stacktrace:
org.apache.spark.sql.streaming.StreamingQueryException: [PYTHON_EXCEPTION]
An exception was thrown from the Python worker: Traceback (most recent call
last):
File
"/Users/tian.gao/programs/spark/python/lib/pyspark.zip/pyspark/worker.py", line
3511, in main
process()
File
"/Users/tian.gao/programs/spark/python/lib/pyspark.zip/pyspark/worker.py", line
3500, in process
out_iter = func(split_index, iterator)
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/Users/tian.gao/programs/spark/python/pyspark/sql/streaming/readwriter.py",
line 1435, in func_with_open_process_close
raise error
File
"/Users/tian.gao/programs/spark/python/pyspark/sql/streaming/readwriter.py",
line 1428, in func_with_open_process_close
f.process(x)
File
"/Users/tian.gao/programs/spark/python/pyspark/sql/tests/streaming/test_streaming_foreach.py",
line 215, in process
raise RuntimeError("test error")
RuntimeError: test error
SQLSTATE: 38000
=== Streaming Query ===
Identifier: [id = f43afccb-7f65-4e8a-9a6e-dd04139ed108, runId =
5496dd83-3cd5-404a-9265-ba647031c964]
Current Committed Offsets: {}
Current Available Offsets:
{FileStreamSource[file:/Users/tian.gao/programs/spark/python/target/7c5a8445-75c8-4c7e-8868-e51306016d62/tmpp1v7krfk]:
{"logOffset":0}}
Current State: ACTIVE
Thread State: RUNNABLE
Logical Plan:
~WriteToMicroBatchDataSource
ForeachWriterTable(org.apache.spark.sql.execution.python.streaming.PythonForeachWriter@30111fd7,Right(org.apache.spark.sql.execution.streaming.sources.ForeachWriterTable$$$Lambda$1911/0x000000b801bad578@603a8a1c)),
f43afccb-7f65-4e8a-9a6e-dd04139ed108, Append
+- ~StreamingExecutionRelation
FileStreamSource[file:/Users/tian.gao/programs/spark/python/target/7c5a8445-75c8-4c7e-8868-e51306016d62/tmpp1v7krfk],
[value#2]
at
org.apache.spark.sql.execution.streaming.runtime.StreamExecution.org$apache$spark$sql$execution$streaming$runtime$StreamExecution$$runStream(StreamExecution.scala:390)
at
org.apache.spark.sql.execution.streaming.runtime.StreamExecution$$anon$1.run(StreamExecution.scala:236)
Caused by: org.apache.spark.api.python.PythonException: [PYTHON_EXCEPTION]
An exception was thrown from the Python worker: Traceback (most recent call
last):
File
"/Users/tian.gao/programs/spark/python/lib/pyspark.zip/pyspark/worker.py", line
3511, in main
process()
File
"/Users/tian.gao/programs/spark/python/lib/pyspark.zip/pyspark/worker.py", line
3500, in process
out_iter = func(split_index, iterator)
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/Users/tian.gao/programs/spark/python/pyspark/sql/streaming/readwriter.py",
line 1435, in func_with_open_process_close
raise error
File
"/Users/tian.gao/programs/spark/python/pyspark/sql/streaming/readwriter.py",
line 1428, in func_with_open_process_close
f.process(x)
File
"/Users/tian.gao/programs/spark/python/pyspark/sql/tests/streaming/test_streaming_foreach.py",
line 215, in process
raise RuntimeError("test error")
RuntimeError: test error
SQLSTATE: 38000
at
org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:692)
at
org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:1086)
at
org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:1068)
at
org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:631)
at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at
org.apache.spark.sql.execution.python.streaming.WriterThread.run(PythonForeachWriter.scala:54)
at
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1017)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2496)
at
org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:451)
at
org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2$(WriteToDataSourceV2Exec.scala:424)
at
org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.writeWithV2(WriteToDataSourceV2Exec.scala:352)
at
org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.run(WriteToDataSourceV2Exec.scala:365)
at
org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:43)
at
org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:43)
at
org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:49)
at
org.apache.spark.sql.classic.Dataset.collectFromPlan(Dataset.scala:2275)
at
org.apache.spark.sql.classic.Dataset.$anonfun$collect$1(Dataset.scala:1504)
at
org.apache.spark.sql.classic.Dataset.$anonfun$withAction$2(Dataset.scala:2265)
at
org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:712)
at
org.apache.spark.sql.classic.Dataset.$anonfun$withAction$1(Dataset.scala:2263)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$8(SQLExecution.scala:177)
at
org.apache.spark.sql.execution.SQLExecution$.withSessionTagsApplied(SQLExecution.scala:285)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$7(SQLExecution.scala:139)
at
org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
at
org.apache.spark.sql.artifact.ArtifactManager.$anonfun$withResources$1(ArtifactManager.scala:112)
at
org.apache.spark.sql.artifact.ArtifactManager.withClassLoaderIfNeeded(ArtifactManager.scala:106)
at
org.apache.spark.sql.artifact.ArtifactManager.withResources(ArtifactManager.scala:111)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$6(SQLExecution.scala:139)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:308)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$1(SQLExecution.scala:138)
at
org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:810)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId0(SQLExecution.scala:92)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:250)
at
org.apache.spark.sql.classic.Dataset.withAction(Dataset.scala:2263)
at org.apache.spark.sql.classic.Dataset.collect(Dataset.scala:1504)
at
org.apache.spark.sql.execution.streaming.runtime.MicroBatchExecution.$anonfun$runBatch$19(MicroBatchExecution.scala:1136)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$8(SQLExecution.scala:177)
at
org.apache.spark.sql.execution.SQLExecution$.withSessionTagsApplied(SQLExecution.scala:285)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$7(SQLExecution.scala:139)
at
org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
at
org.apache.spark.sql.artifact.ArtifactManager.$anonfun$withResources$1(ArtifactManager.scala:112)
at
org.apache.spark.sql.artifact.ArtifactManager.withClassLoaderIfNeeded(ArtifactManager.scala:106)
at
org.apache.spark.sql.artifact.ArtifactManager.withResources(ArtifactManager.scala:111)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$6(SQLExecution.scala:139)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:308)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$1(SQLExecution.scala:138)
at
org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:810)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId0(SQLExecution.scala:92)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:250)
at
org.apache.spark.sql.execution.streaming.runtime.MicroBatchExecution.$anonfun$runBatch$18(MicroBatchExecution.scala:1118)
at
org.apache.spark.sql.execution.streaming.runtime.ProgressContext.reportTimeTaken(ProgressReporter.scala:200)
at
org.apache.spark.sql.execution.streaming.runtime.MicroBatchExecution.runBatch(MicroBatchExecution.scala:1118)
at
org.apache.spark.sql.execution.streaming.runtime.MicroBatchExecution.$anonfun$executeOneBatch$2(MicroBatchExecution.scala:556)
at
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
at
org.apache.spark.sql.execution.streaming.runtime.ProgressContext.reportTimeTaken(ProgressReporter.scala:200)
at
org.apache.spark.sql.execution.streaming.runtime.MicroBatchExecution.executeOneBatch(MicroBatchExecution.scala:521)
at
org.apache.spark.sql.execution.streaming.runtime.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:501)
at
org.apache.spark.sql.execution.streaming.runtime.MicroBatchExecution.$anonfun$runActivatedStream$1$adapted(MicroBatchExecution.scala:501)
at
org.apache.spark.sql.execution.streaming.runtime.TriggerExecutor.runOneBatch(TriggerExecutor.scala:40)
at
org.apache.spark.sql.execution.streaming.runtime.TriggerExecutor.runOneBatch$(TriggerExecutor.scala:38)
at
org.apache.spark.sql.execution.streaming.runtime.ProcessingTimeExecutor.runOneBatch(TriggerExecutor.scala:71)
at
org.apache.spark.sql.execution.streaming.runtime.ProcessingTimeExecutor.execute(TriggerExecutor.scala:83)
at
org.apache.spark.sql.execution.streaming.runtime.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:501)
at
org.apache.spark.sql.execution.streaming.runtime.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:353)
at
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
at
org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:810)
at
org.apache.spark.sql.execution.streaming.runtime.StreamExecution.org$apache$spark$sql$execution$streaming$runtime$StreamExecution$$runStream(StreamExecution.scala:313)
... 1 more
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]