HyukjinKwon commented on code in PR #42986:
URL: https://github.com/apache/spark/pull/42986#discussion_r1329392064


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingForeachBatchHelper.scala:
##########
@@ -125,8 +128,21 @@ object StreamingForeachBatchHelper extends Logging {
       dataOut.writeLong(args.batchId)
       dataOut.flush()
 
-      val ret = dataIn.readInt()
-      logInfo(s"Python foreach batch for dfId ${args.dfId} completed (ret: 
$ret)")
+      try {
+        dataIn.readInt() match {
+          case ret if ret == 0 =>
+            logInfo(s"Python foreach batch for dfId ${args.dfId} completed 
(ret: $ret)")
+          case SpecialLengths.PYTHON_EXCEPTION_THROWN =>
+            val exLength = dataIn.readInt()
+            val obj = new Array[Byte](exLength)
+            dataIn.readFully(obj)
+            val msg = new String(obj, StandardCharsets.UTF_8)
+            throw new IllegalStateException(s"Found error inside foreachBatch 
Python process: $msg")

Review Comment:
   Should it be `PythonException`? To match with 
`PythonRunner.handlePythonException`. Or it'd be great to see if we can reuse 
them.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to