attilapiros commented on pull request #32169:
URL: https://github.com/apache/spark/pull/32169#issuecomment-820395723


   @srowen 
   
   In `MonitorThread` this the interesting part for us:
   
https://github.com/apache/spark/blob/4f1b687c1a6f6f0d3e0dabca0d88a78e4315f9d6/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L582-L597
   
   So when the task is completed then this Monitor thread do nothing 
interesting just stopping without doing anything. 
   Its main purpose to handle task interruptions. 
   
   So when the task was interrupted before this PR then multiple 
`MonitorThread` were called `destroyPythonWorker` for the same socket which was 
delegating to
   
https://github.com/apache/spark/blob/c4a5e2dfa38d754f92ea6f4b98f549b7d6108639/core/src/main/scala/org/apache/spark/SparkEnv.scala#L128
   
   We know here the key is the same as the socket was the same. 
   
   Going further the road we reach `stopWorker`:
   
https://github.com/apache/spark/blob/c4a5e2dfa38d754f92ea6f4b98f549b7d6108639/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala#L344-L361
   
   As I see `useDeamon` must be true as create only reusing sockets when the 
`useDaemon` is true:
   
https://github.com/apache/spark/blob/c4a5e2dfa38d754f92ea6f4b98f549b7d6108639/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala#L102
   
   Now back to `stopWorker`. We can see it just sends a pid. But that pid is 
coming from `HashMap` where the socket is the key:
   
https://github.com/apache/spark/blob/c4a5e2dfa38d754f92ea6f4b98f549b7d6108639/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala#L83
   
   So multiple monitor threads for the same socket will just sends the same pid 
multiple times via `daemon.getOutputStream` which is the stdin of the daemon 
process. (The taskcontext/taskAttemptID is only needed for my 
`runningMonitorThreads` to monitor each separate task interruptions separately.)
   
   Let's see what happens in the daemon side:
   
https://github.com/apache/spark/blob/c4a5e2dfa38d754f92ea6f4b98f549b7d6108639/python/pyspark/daemon.py#L126-L135
   
   So we are sending just a SIGKILL to the PID arrived via the stdin. 
   This is really redundant for the same pid. Those errors (sending kill to a 
nonexisting PID) are ignored by `except OSError:`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to