gaogaotiantian commented on code in PR #53099:
URL: https://github.com/apache/spark/pull/53099#discussion_r2653901333


##########
python/pyspark/sql/tests/streaming/test_streaming_foreach_batch.py:
##########
@@ -81,8 +81,6 @@ def collectBatch(df, id):
         except StreamingQueryException as e:
             err_msg = str(e)
             self.assertTrue("this should fail" in err_msg)
-            # check for foreachBatch error class
-            self.assertTrue("FOREACH_BATCH_USER_FUNCTION_ERROR" in err_msg)

Review Comment:
   Before:
   ```
   [STREAM_FAILED] Query [id = fd010fca-0598-47fc-91ea-a9cf8f8324f4, runId = 
43b7d8d3-1df0-4bf8-b5ec-229d95c1819a] terminated with exception: 
[FOREACH_USER_FUNCTION_ERROR] An error occurred in the user provided function 
in foreach sink. Reason: Traceback (most recent call last):
     File 
"/Users/tian.gao/programs/spark/python/lib/pyspark.zip/pyspark/worker.py", line 
3511, in main
       process()
     File 
"/Users/tian.gao/programs/spark/python/lib/pyspark.zip/pyspark/worker.py", line 
3500, in process
       out_iter = func(split_index, iterator)
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/Users/tian.gao/programs/spark/python/pyspark/sql/streaming/readwriter.py", 
line 1435, in func_with_open_process_close
       raise error
     File 
"/Users/tian.gao/programs/spark/python/pyspark/sql/streaming/readwriter.py", 
line 1428, in func_with_open_process_close
       f.process(x)
     File 
"/Users/tian.gao/programs/spark/python/pyspark/sql/tests/streaming/test_streaming_foreach.py",
 line 215, in process
       raise RuntimeError("test error")
   RuntimeError: test error
    SQLSTATE: 39000 SQLSTATE: XXKST
   === Streaming Query ===
   Identifier: [id = fd010fca-0598-47fc-91ea-a9cf8f8324f4, runId = 
43b7d8d3-1df0-4bf8-b5ec-229d95c1819a]
   Current Committed Offsets: {}
   Current Available Offsets: 
{FileStreamSource[file:/Users/tian.gao/programs/spark/python/target/af2fbd89-63cb-442d-a3fd-0347beb42891/tmptzii6xhz]:
 {"logOffset":0}}
   
   Current State: ACTIVE
   Thread State: RUNNABLE
   
   Logical Plan:
   ~WriteToMicroBatchDataSource 
ForeachWriterTable(org.apache.spark.sql.execution.python.streaming.PythonForeachWriter@4de70c1c,Right(org.apache.spark.sql.execution.streaming.sources.ForeachWriterTable$$$Lambda$1911/0x000000c001bae228@43c4937d)),
 fd010fca-0598-47fc-91ea-a9cf8f8324f4, Append
   +- ~StreamingExecutionRelation 
FileStreamSource[file:/Users/tian.gao/programs/spark/python/target/af2fbd89-63cb-442d-a3fd-0347beb42891/tmptzii6xhz],
 [value#2]
   
   
   JVM stacktrace:
   org.apache.spark.sql.streaming.StreamingQueryException: Traceback (most 
recent call last):
     File 
"/Users/tian.gao/programs/spark/python/lib/pyspark.zip/pyspark/worker.py", line 
3511, in main
       process()
     File 
"/Users/tian.gao/programs/spark/python/lib/pyspark.zip/pyspark/worker.py", line 
3500, in process
       out_iter = func(split_index, iterator)
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/Users/tian.gao/programs/spark/python/pyspark/sql/streaming/readwriter.py", 
line 1435, in func_with_open_process_close
       raise error
     File 
"/Users/tian.gao/programs/spark/python/pyspark/sql/streaming/readwriter.py", 
line 1428, in func_with_open_process_close
       f.process(x)
     File 
"/Users/tian.gao/programs/spark/python/pyspark/sql/tests/streaming/test_streaming_foreach.py",
 line 215, in process
       raise RuntimeError("test error")
   RuntimeError: test error
   
   === Streaming Query ===
   Identifier: [id = fd010fca-0598-47fc-91ea-a9cf8f8324f4, runId = 
43b7d8d3-1df0-4bf8-b5ec-229d95c1819a]
   Current Committed Offsets: {}
   Current Available Offsets: 
{FileStreamSource[file:/Users/tian.gao/programs/spark/python/target/af2fbd89-63cb-442d-a3fd-0347beb42891/tmptzii6xhz]:
 {"logOffset":0}}
   
   Current State: ACTIVE
   Thread State: RUNNABLE
   
   Logical Plan:
   ~WriteToMicroBatchDataSource 
ForeachWriterTable(org.apache.spark.sql.execution.python.streaming.PythonForeachWriter@4de70c1c,Right(org.apache.spark.sql.execution.streaming.sources.ForeachWriterTable$$$Lambda$1911/0x000000c001bae228@43c4937d)),
 fd010fca-0598-47fc-91ea-a9cf8f8324f4, Append
   +- ~StreamingExecutionRelation 
FileStreamSource[file:/Users/tian.gao/programs/spark/python/target/af2fbd89-63cb-442d-a3fd-0347beb42891/tmptzii6xhz],
 [value#2]
   
           at 
org.apache.spark.sql.execution.streaming.runtime.StreamExecution.org$apache$spark$sql$execution$streaming$runtime$StreamExecution$$runStream(StreamExecution.scala:390)
           at 
org.apache.spark.sql.execution.streaming.runtime.StreamExecution$$anon$1.run(StreamExecution.scala:236)
   Caused by: org.apache.spark.api.python.PythonException: Traceback (most 
recent call last):
     File 
"/Users/tian.gao/programs/spark/python/lib/pyspark.zip/pyspark/worker.py", line 
3511, in main
       process()
     File 
"/Users/tian.gao/programs/spark/python/lib/pyspark.zip/pyspark/worker.py", line 
3500, in process
       out_iter = func(split_index, iterator)
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/Users/tian.gao/programs/spark/python/pyspark/sql/streaming/readwriter.py", 
line 1435, in func_with_open_process_close
       raise error
     File 
"/Users/tian.gao/programs/spark/python/pyspark/sql/streaming/readwriter.py", 
line 1428, in func_with_open_process_close
       f.process(x)
     File 
"/Users/tian.gao/programs/spark/python/pyspark/sql/tests/streaming/test_streaming_foreach.py",
 line 215, in process
       raise RuntimeError("test error")
   RuntimeError: test error
   
           at 
org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:691)
           at 
org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:1082)
           at 
org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:1064)
           at 
org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:631)
           at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
           at 
org.apache.spark.sql.execution.python.streaming.WriterThread.run(PythonForeachWriter.scala:54)
   ```
   
   After:
   ```
   [STREAM_FAILED] Query [id = f43afccb-7f65-4e8a-9a6e-dd04139ed108, runId = 
5496dd83-3cd5-404a-9265-ba647031c964] terminated with exception: 
[PYTHON_EXCEPTION] An exception was thrown from the Python worker: Traceback 
(most recent call last):
     File 
"/Users/tian.gao/programs/spark/python/lib/pyspark.zip/pyspark/worker.py", line 
3511, in main
       process()
     File 
"/Users/tian.gao/programs/spark/python/lib/pyspark.zip/pyspark/worker.py", line 
3500, in process
       out_iter = func(split_index, iterator)
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/Users/tian.gao/programs/spark/python/pyspark/sql/streaming/readwriter.py", 
line 1435, in func_with_open_process_close
       raise error
     File 
"/Users/tian.gao/programs/spark/python/pyspark/sql/streaming/readwriter.py", 
line 1428, in func_with_open_process_close
       f.process(x)
     File 
"/Users/tian.gao/programs/spark/python/pyspark/sql/tests/streaming/test_streaming_foreach.py",
 line 215, in process
       raise RuntimeError("test error")
   RuntimeError: test error
    SQLSTATE: 38000 SQLSTATE: XXKST
   === Streaming Query ===
   Identifier: [id = f43afccb-7f65-4e8a-9a6e-dd04139ed108, runId = 
5496dd83-3cd5-404a-9265-ba647031c964]
   Current Committed Offsets: {}
   Current Available Offsets: 
{FileStreamSource[file:/Users/tian.gao/programs/spark/python/target/7c5a8445-75c8-4c7e-8868-e51306016d62/tmpp1v7krfk]:
 {"logOffset":0}}
   
   Current State: ACTIVE
   Thread State: RUNNABLE
   
   Logical Plan:
   ~WriteToMicroBatchDataSource 
ForeachWriterTable(org.apache.spark.sql.execution.python.streaming.PythonForeachWriter@30111fd7,Right(org.apache.spark.sql.execution.streaming.sources.ForeachWriterTable$$$Lambda$1911/0x000000b801bad578@603a8a1c)),
 f43afccb-7f65-4e8a-9a6e-dd04139ed108, Append
   +- ~StreamingExecutionRelation 
FileStreamSource[file:/Users/tian.gao/programs/spark/python/target/7c5a8445-75c8-4c7e-8868-e51306016d62/tmpp1v7krfk],
 [value#2]
   
   
   JVM stacktrace:
   org.apache.spark.sql.streaming.StreamingQueryException: [PYTHON_EXCEPTION] 
An exception was thrown from the Python worker: Traceback (most recent call 
last):
     File 
"/Users/tian.gao/programs/spark/python/lib/pyspark.zip/pyspark/worker.py", line 
3511, in main
       process()
     File 
"/Users/tian.gao/programs/spark/python/lib/pyspark.zip/pyspark/worker.py", line 
3500, in process
       out_iter = func(split_index, iterator)
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/Users/tian.gao/programs/spark/python/pyspark/sql/streaming/readwriter.py", 
line 1435, in func_with_open_process_close
       raise error
     File 
"/Users/tian.gao/programs/spark/python/pyspark/sql/streaming/readwriter.py", 
line 1428, in func_with_open_process_close
       f.process(x)
     File 
"/Users/tian.gao/programs/spark/python/pyspark/sql/tests/streaming/test_streaming_foreach.py",
 line 215, in process
       raise RuntimeError("test error")
   RuntimeError: test error
    SQLSTATE: 38000
   === Streaming Query ===
   Identifier: [id = f43afccb-7f65-4e8a-9a6e-dd04139ed108, runId = 
5496dd83-3cd5-404a-9265-ba647031c964]
   Current Committed Offsets: {}
   Current Available Offsets: 
{FileStreamSource[file:/Users/tian.gao/programs/spark/python/target/7c5a8445-75c8-4c7e-8868-e51306016d62/tmpp1v7krfk]:
 {"logOffset":0}}
   
   Current State: ACTIVE
   Thread State: RUNNABLE
   
   Logical Plan:
   ~WriteToMicroBatchDataSource 
ForeachWriterTable(org.apache.spark.sql.execution.python.streaming.PythonForeachWriter@30111fd7,Right(org.apache.spark.sql.execution.streaming.sources.ForeachWriterTable$$$Lambda$1911/0x000000b801bad578@603a8a1c)),
 f43afccb-7f65-4e8a-9a6e-dd04139ed108, Append
   +- ~StreamingExecutionRelation 
FileStreamSource[file:/Users/tian.gao/programs/spark/python/target/7c5a8445-75c8-4c7e-8868-e51306016d62/tmpp1v7krfk],
 [value#2]
   
           at 
org.apache.spark.sql.execution.streaming.runtime.StreamExecution.org$apache$spark$sql$execution$streaming$runtime$StreamExecution$$runStream(StreamExecution.scala:390)
           at 
org.apache.spark.sql.execution.streaming.runtime.StreamExecution$$anon$1.run(StreamExecution.scala:236)
   Caused by: org.apache.spark.api.python.PythonException: [PYTHON_EXCEPTION] 
An exception was thrown from the Python worker: Traceback (most recent call 
last):
     File 
"/Users/tian.gao/programs/spark/python/lib/pyspark.zip/pyspark/worker.py", line 
3511, in main
       process()
     File 
"/Users/tian.gao/programs/spark/python/lib/pyspark.zip/pyspark/worker.py", line 
3500, in process
       out_iter = func(split_index, iterator)
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/Users/tian.gao/programs/spark/python/pyspark/sql/streaming/readwriter.py", 
line 1435, in func_with_open_process_close
       raise error
     File 
"/Users/tian.gao/programs/spark/python/pyspark/sql/streaming/readwriter.py", 
line 1428, in func_with_open_process_close
       f.process(x)
     File 
"/Users/tian.gao/programs/spark/python/pyspark/sql/tests/streaming/test_streaming_foreach.py",
 line 215, in process
       raise RuntimeError("test error")
   RuntimeError: test error
    SQLSTATE: 38000
           at 
org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:692)
           at 
org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:1086)
           at 
org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:1068)
           at 
org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:631)
           at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
           at 
org.apache.spark.sql.execution.python.streaming.WriterThread.run(PythonForeachWriter.scala:54)
           at 
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1017)
           at org.apache.spark.SparkContext.runJob(SparkContext.scala:2496)
           at 
org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:451)
           at 
org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2$(WriteToDataSourceV2Exec.scala:424)
           at 
org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.writeWithV2(WriteToDataSourceV2Exec.scala:352)
           at 
org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.run(WriteToDataSourceV2Exec.scala:365)
           at 
org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:43)
           at 
org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:43)
           at 
org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:49)
           at 
org.apache.spark.sql.classic.Dataset.collectFromPlan(Dataset.scala:2275)
           at 
org.apache.spark.sql.classic.Dataset.$anonfun$collect$1(Dataset.scala:1504)
           at 
org.apache.spark.sql.classic.Dataset.$anonfun$withAction$2(Dataset.scala:2265)
           at 
org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:712)
           at 
org.apache.spark.sql.classic.Dataset.$anonfun$withAction$1(Dataset.scala:2263)
           at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$8(SQLExecution.scala:177)
           at 
org.apache.spark.sql.execution.SQLExecution$.withSessionTagsApplied(SQLExecution.scala:285)
           at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$7(SQLExecution.scala:139)
           at 
org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
           at 
org.apache.spark.sql.artifact.ArtifactManager.$anonfun$withResources$1(ArtifactManager.scala:112)
           at 
org.apache.spark.sql.artifact.ArtifactManager.withClassLoaderIfNeeded(ArtifactManager.scala:106)
           at 
org.apache.spark.sql.artifact.ArtifactManager.withResources(ArtifactManager.scala:111)
           at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$6(SQLExecution.scala:139)
           at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:308)
           at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$1(SQLExecution.scala:138)
           at 
org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:810)
           at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId0(SQLExecution.scala:92)
           at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:250)
           at 
org.apache.spark.sql.classic.Dataset.withAction(Dataset.scala:2263)
           at org.apache.spark.sql.classic.Dataset.collect(Dataset.scala:1504)
           at 
org.apache.spark.sql.execution.streaming.runtime.MicroBatchExecution.$anonfun$runBatch$19(MicroBatchExecution.scala:1136)
           at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$8(SQLExecution.scala:177)
           at 
org.apache.spark.sql.execution.SQLExecution$.withSessionTagsApplied(SQLExecution.scala:285)
           at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$7(SQLExecution.scala:139)
           at 
org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
           at 
org.apache.spark.sql.artifact.ArtifactManager.$anonfun$withResources$1(ArtifactManager.scala:112)
           at 
org.apache.spark.sql.artifact.ArtifactManager.withClassLoaderIfNeeded(ArtifactManager.scala:106)
           at 
org.apache.spark.sql.artifact.ArtifactManager.withResources(ArtifactManager.scala:111)
           at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$6(SQLExecution.scala:139)
           at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:308)
           at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$1(SQLExecution.scala:138)
           at 
org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:810)
           at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId0(SQLExecution.scala:92)
           at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:250)
           at 
org.apache.spark.sql.execution.streaming.runtime.MicroBatchExecution.$anonfun$runBatch$18(MicroBatchExecution.scala:1118)
           at 
org.apache.spark.sql.execution.streaming.runtime.ProgressContext.reportTimeTaken(ProgressReporter.scala:200)
           at 
org.apache.spark.sql.execution.streaming.runtime.MicroBatchExecution.runBatch(MicroBatchExecution.scala:1118)
           at 
org.apache.spark.sql.execution.streaming.runtime.MicroBatchExecution.$anonfun$executeOneBatch$2(MicroBatchExecution.scala:556)
           at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
           at 
org.apache.spark.sql.execution.streaming.runtime.ProgressContext.reportTimeTaken(ProgressReporter.scala:200)
           at 
org.apache.spark.sql.execution.streaming.runtime.MicroBatchExecution.executeOneBatch(MicroBatchExecution.scala:521)
           at 
org.apache.spark.sql.execution.streaming.runtime.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:501)
           at 
org.apache.spark.sql.execution.streaming.runtime.MicroBatchExecution.$anonfun$runActivatedStream$1$adapted(MicroBatchExecution.scala:501)
           at 
org.apache.spark.sql.execution.streaming.runtime.TriggerExecutor.runOneBatch(TriggerExecutor.scala:40)
           at 
org.apache.spark.sql.execution.streaming.runtime.TriggerExecutor.runOneBatch$(TriggerExecutor.scala:38)
           at 
org.apache.spark.sql.execution.streaming.runtime.ProcessingTimeExecutor.runOneBatch(TriggerExecutor.scala:71)
           at 
org.apache.spark.sql.execution.streaming.runtime.ProcessingTimeExecutor.execute(TriggerExecutor.scala:83)
           at 
org.apache.spark.sql.execution.streaming.runtime.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:501)
           at 
org.apache.spark.sql.execution.streaming.runtime.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:353)
           at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
           at 
org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:810)
           at 
org.apache.spark.sql.execution.streaming.runtime.StreamExecution.org$apache$spark$sql$execution$streaming$runtime$StreamExecution$$runStream(StreamExecution.scala:313)
           ... 1 more
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to