cloud-fan commented on code in PR #36065:
URL: https://github.com/apache/spark/pull/36065#discussion_r847897425
##########
core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala:
##########
@@ -646,6 +647,54 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
}
}
}
+
+ /**
+ * This thread monitors the WriterThread and kills it in case of deadlock.
+ *
+ * A deadlock can arise if the task completes while the writer thread is
sending input to the
+ * Python process (e.g. due to the use of `take()`), and the Python process
is still producing
+ * output. When the inputs are sufficiently large, this can result in a
deadlock due to the use of
+ * blocking I/O (SPARK-38677). To resolve the deadlock, we need to close the
socket.
+ */
+ class WriterMonitorThread(
+ env: SparkEnv, worker: Socket, writerThread: WriterThread, context:
TaskContext)
+ extends Thread(s"Writer Monitor for $pythonExec (writer thread id
${writerThread.getId})") {
+
+ /**
+ * How long to wait before closing the socket if the writer thread has not
exited after the task
+ * ends.
+ */
+ private val taskKillTimeout = env.conf.get(PYTHON_TASK_KILL_TIMEOUT)
+
+ setDaemon(true)
+
+ override def run(): Unit = {
+ // Wait until the task is completed (or the writer thread exits, in
which case this thread has
+ // nothing to do).
+ while (!context.isCompleted && writerThread.isAlive) {
+ Thread.sleep(2000)
+ }
+ if (writerThread.isAlive) {
+ Thread.sleep(taskKillTimeout)
+ // If the writer thread continues running, this indicates a deadlock.
Kill the worker to
+ // resolve the deadlock.
+ if (writerThread.isAlive) {
+ try {
+ // Mimic the task name used in `Executor` to help the user find
out the task to blame.
+ val taskName = s"${context.partitionId}.${context.attemptNumber} "
+
+ s"in stage ${context.stageId} (TID ${context.taskAttemptId})"
+ logWarning(
+ s"Detected deadlock while completing task $taskName: " +
+ "Attempting to kill Python Worker")
+ env.destroyPythonWorker(pythonExec, envVars.asScala.toMap, worker)
+ } catch {
+ case e: Exception =>
Review Comment:
+1
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]