bogao007 commented on code in PR #42986:
URL: https://github.com/apache/spark/pull/42986#discussion_r1329447168


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingQueryListenerHelper.scala:
##########
@@ -76,16 +78,21 @@ class PythonStreamingQueryListener(listener: 
SimplePythonFunction, sessionHolder
   }
 
   private def handlePythonWorkerError(functionName: String): Unit = {
-    dataIn.readInt() match {
-      case ret if ret == 0 =>
-        logInfo(s"Streaming query listener function $functionName completed 
(ret: $ret)")
-      case SpecialLengths.PYTHON_EXCEPTION_THROWN =>
-        val exLength = dataIn.readInt()
-        val obj = new Array[Byte](exLength)
-        dataIn.readFully(obj)
-        val msg = new String(obj, StandardCharsets.UTF_8)
-        throw new IllegalStateException(s"Found error inside Streaming query 
listener Python " +
-          s"process for function $functionName: $msg")
+    try {
+      dataIn.readInt() match {
+        case ret if ret == 0 =>
+          logInfo(s"Streaming query listener function $functionName completed 
(ret: $ret)")
+        case SpecialLengths.PYTHON_EXCEPTION_THROWN =>
+          val exLength = dataIn.readInt()
+          val obj = new Array[Byte](exLength)
+          dataIn.readFully(obj)
+          val msg = new String(obj, StandardCharsets.UTF_8)
+          throw new PythonException(s"Found error inside Streaming query 
listener Python " +
+            s"process for function $functionName: $msg", null)
+      }
+    } catch {
+      case eof: EOFException =>

Review Comment:
   If we want to add retry logic, adding it here would not solve the problem. 
If the EOF is due to an IOException during `write_` functions, we might need to 
add retries inside the workers. But I'm not sure how high would the possibility 
be for that scenario, since if IOException happens while writing, it's highly 
possible that the socket is closed, and retry would no help in that case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to