ankurdave commented on code in PR #36065:
URL: https://github.com/apache/spark/pull/36065#discussion_r844254756
##########
core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala:
##########
@@ -188,10 +188,10 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
// SPARK-35009: avoid creating multiple monitor threads for the same
python worker
// and task context
if (PythonRunner.runningMonitorThreads.add(key)) {
- new MonitorThread(SparkEnv.get, worker, context).start()
+ new MonitorThread(SparkEnv.get, worker, writerThread, context).start()
Review Comment:
Hmm, good catch. The lifetime of the MonitorThread is tied to the lifetime
of the Python worker, so this won't work when the worker is reused.
I think it would be easier to just create a new WriterMonitorThread that is
tied to the lifetime of the writer thread, rather than trying to stuff the
functionality into the MonitorThread.
I'll prototype this and try to craft a unit test that exercises the case you
mentioned. It seems like we should be able to see the effect with something
like `rdd.mapPartitions(lambda iterator: iterator.take(1)).coalesce(1)`. It
might be tricky to actually reproduce the deadlock in that case though.
cc @attilapiros
##########
core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala:
##########
@@ -613,14 +620,16 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
setDaemon(true)
private def monitorWorker(): Unit = {
- // Kill the worker if it is interrupted, checking until task completion.
+ // Wait until the task is either canceled or completed.
// TODO: This has a race condition if interruption occurs, as completed
may still become true.
while (!context.isInterrupted && !context.isCompleted) {
Review Comment:
Yeah, I think we might want that, though I'm having trouble reasoning about
the effect it would have on worker reuse. We still want the monitor thread's
lifetime to be tied to the Python worker's lifetime.
As I mentioned above, it seems like it would be easier to create a new kind
of monitor thread for this case.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]