cloud-fan commented on code in PR #33934:
URL: https://github.com/apache/spark/pull/33934#discussion_r1517429300
##########
core/src/main/scala/org/apache/spark/util/Utils.scala:
##########
@@ -3154,6 +3155,89 @@ private[spark] class RedirectThread(
}
}
+/**
+ * PythonRunner process wrapper. In this wrapper will collect process error
message,
+ * if process exit with exception, spark can get process exit message.
+ *
+ * @param process PythonRunner python process
+ * @param out Where to redirect the message.
+ * @param name process name.
+ * @param propagateEof If propagate Eof.
+ */
+private[spark] class PythonRunnerProcessWrapper(
+ process: Process,
+ out: OutputStream,
+ name: String,
+ propagateEof: Boolean = false) extends Thread(name) {
+
+ val lock = new ReentrantReadWriteLock()
+ val error = new CircularBuffer(1024 * 1024)
+
+ def errorMessage: String = error.toString()
+
+ private class ProcessRedirectThread(
+ in: InputStream,
+ out: OutputStream,
+ name: String,
+ isStderr: Boolean)
+ extends Thread(name) {
+
+ setDaemon(true)
+ override def run(): Unit = {
+ scala.util.control.Exception.ignoring(classOf[IOException]) {
+ Utils.tryWithSafeFinally {
+ val buf = new Array[Byte](1024)
+ var len = in.read(buf)
+ while (len != -1) {
+ lock.writeLock().lock()
+ while (len > 0) {
+ out.write(buf, 0, len)
+ out.flush()
+ if (isStderr) {
+ error.write(buf, 0, len)
+ error.flush()
+ }
+ if (in.available() > 0) {
+ len = in.read(buf)
Review Comment:
In `RedirectThread`, we just call `len = in.read(buf)`, without checking
`in.available() > 0`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]