ankurdave commented on code in PR #36065:
URL: https://github.com/apache/spark/pull/36065#discussion_r844254756


##########
core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala:
##########
@@ -188,10 +188,10 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
       // SPARK-35009: avoid creating multiple monitor threads for the same 
python worker
       // and task context
       if (PythonRunner.runningMonitorThreads.add(key)) {
-        new MonitorThread(SparkEnv.get, worker, context).start()
+        new MonitorThread(SparkEnv.get, worker, writerThread, context).start()

Review Comment:
   Hmm, good catch. The lifetime of the MonitorThread is tied to the lifetime 
of the Python worker, so this won't work when the worker is reused.
   
   I think it would be easier to just create a new WriterMonitorThread that is 
tied to the lifetime of the writer thread, rather than trying to stuff the 
functionality into the MonitorThread.
   
   I'll prototype this and try to craft a unit test that exercises the case you 
mentioned. It seems like we should be able to see the effect with something 
like `rdd.mapPartitions(lambda iterator: iterator.take(1)).coalesce(1)`. It 
might be tricky to actually reproduce the deadlock in that case though.
   
   cc @attilapiros 



##########
core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala:
##########
@@ -613,14 +620,16 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
     setDaemon(true)
 
     private def monitorWorker(): Unit = {
-      // Kill the worker if it is interrupted, checking until task completion.
+      // Wait until the task is either canceled or completed.
       // TODO: This has a race condition if interruption occurs, as completed 
may still become true.
       while (!context.isInterrupted && !context.isCompleted) {

Review Comment:
   Yeah, I think we might want that, though I'm having trouble reasoning about 
the effect it would have on worker reuse. We still want the monitor thread's 
lifetime to be tied to the Python worker's lifetime.
   
   As I mentioned above, it seems like it would be easier to create a new kind 
of monitor thread for this case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to