bogao007 commented on code in PR #42986:
URL: https://github.com/apache/spark/pull/42986#discussion_r1329574367
##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingQueryListenerHelper.scala:
##########
@@ -76,16 +78,21 @@ class PythonStreamingQueryListener(listener:
SimplePythonFunction, sessionHolder
}
private def handlePythonWorkerError(functionName: String): Unit = {
- dataIn.readInt() match {
- case ret if ret == 0 =>
- logInfo(s"Streaming query listener function $functionName completed
(ret: $ret)")
- case SpecialLengths.PYTHON_EXCEPTION_THROWN =>
- val exLength = dataIn.readInt()
- val obj = new Array[Byte](exLength)
- dataIn.readFully(obj)
- val msg = new String(obj, StandardCharsets.UTF_8)
- throw new IllegalStateException(s"Found error inside Streaming query
listener Python " +
- s"process for function $functionName: $msg")
+ try {
+ dataIn.readInt() match {
+ case ret if ret == 0 =>
+ logInfo(s"Streaming query listener function $functionName completed
(ret: $ret)")
+ case SpecialLengths.PYTHON_EXCEPTION_THROWN =>
+ val exLength = dataIn.readInt()
+ val obj = new Array[Byte](exLength)
+ dataIn.readFully(obj)
+ val msg = new String(obj, StandardCharsets.UTF_8)
+ throw new PythonException(s"Found error inside Streaming query
listener Python " +
+ s"process for function $functionName: $msg", null)
+ }
+ } catch {
+ case eof: EOFException =>
Review Comment:
Not sure if catching any `NonFatal(ex)` is a good solution here since if
errors occurs, it's highly possible that something went wrong in the python
worker, and better to fail the query in this case. I'll leave it as what it is
for now and also added a TODO in `StreamingForeachBatchHelper` to improve
handling this scenario later.
##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingForeachBatchHelper.scala:
##########
@@ -125,8 +128,21 @@ object StreamingForeachBatchHelper extends Logging {
dataOut.writeLong(args.batchId)
dataOut.flush()
- val ret = dataIn.readInt()
- logInfo(s"Python foreach batch for dfId ${args.dfId} completed (ret:
$ret)")
+ try {
+ dataIn.readInt() match {
+ case ret if ret == 0 =>
+ logInfo(s"Python foreach batch for dfId ${args.dfId} completed
(ret: $ret)")
+ case SpecialLengths.PYTHON_EXCEPTION_THROWN =>
+ val exLength = dataIn.readInt()
+ val obj = new Array[Byte](exLength)
+ dataIn.readFully(obj)
+ val msg = new String(obj, StandardCharsets.UTF_8)
+ throw new PythonException(s"Found error inside foreachBatch Python
process: $msg", null)
Review Comment:
Updated.
##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingQueryListenerHelper.scala:
##########
@@ -38,33 +43,56 @@ class PythonStreamingQueryListener(listener:
SimplePythonFunction, sessionHolder
sessionHolder.sessionId,
"pyspark.sql.connect.streaming.worker.listener_worker")
- val (dataOut, _) = runner.init()
+ val (dataOut, dataIn) = runner.init()
override def onQueryStarted(event:
StreamingQueryListener.QueryStartedEvent): Unit = {
PythonRDD.writeUTF(event.json, dataOut)
dataOut.writeInt(0)
dataOut.flush()
+ handlePythonWorkerError("onQueryStarted")
}
override def onQueryProgress(event:
StreamingQueryListener.QueryProgressEvent): Unit = {
PythonRDD.writeUTF(event.json, dataOut)
dataOut.writeInt(1)
dataOut.flush()
+ handlePythonWorkerError("onQueryProgress")
}
override def onQueryIdle(event: StreamingQueryListener.QueryIdleEvent): Unit
= {
PythonRDD.writeUTF(event.json, dataOut)
dataOut.writeInt(2)
dataOut.flush()
+ handlePythonWorkerError("onQueryIdle")
}
override def onQueryTerminated(event:
StreamingQueryListener.QueryTerminatedEvent): Unit = {
PythonRDD.writeUTF(event.json, dataOut)
dataOut.writeInt(3)
dataOut.flush()
+ handlePythonWorkerError("onQueryTerminated")
}
private[spark] def stopListenerProcess(): Unit = {
runner.stop()
}
+
+ private def handlePythonWorkerError(functionName: String): Unit = {
+ try {
+ dataIn.readInt() match {
Review Comment:
Done.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]