bogao007 commented on code in PR #42986:
URL: https://github.com/apache/spark/pull/42986#discussion_r1329422764


##########
python/pyspark/sql/connect/streaming/worker/foreach_batch_worker.py:
##########
@@ -69,8 +73,32 @@ def process(df_id, batch_id):  # type: ignore[no-untyped-def]
     while True:
         df_ref_id = utf8_deserializer.loads(infile)
         batch_id = read_long(infile)
-        process(df_ref_id, int(batch_id))  # TODO(SPARK-44463): Propagate 
error to the user.
-        write_int(0, outfile)
+        # Handle errors inside Python worker. Write 0 to outfile if no errors 
and write -2 with
+        # traceback string if error occurs.
+        try:
+            process(df_ref_id, int(batch_id))
+            write_int(0, outfile)
+        except BaseException as e:

Review Comment:
   done.



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingForeachBatchHelper.scala:
##########
@@ -125,8 +128,21 @@ object StreamingForeachBatchHelper extends Logging {
       dataOut.writeLong(args.batchId)
       dataOut.flush()
 
-      val ret = dataIn.readInt()
-      logInfo(s"Python foreach batch for dfId ${args.dfId} completed (ret: 
$ret)")
+      try {
+        dataIn.readInt() match {
+          case ret if ret == 0 =>
+            logInfo(s"Python foreach batch for dfId ${args.dfId} completed 
(ret: $ret)")
+          case SpecialLengths.PYTHON_EXCEPTION_THROWN =>
+            val exLength = dataIn.readInt()
+            val obj = new Array[Byte](exLength)
+            dataIn.readFully(obj)
+            val msg = new String(obj, StandardCharsets.UTF_8)
+            throw new IllegalStateException(s"Found error inside foreachBatch 
Python process: $msg")

Review Comment:
   updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to