attilapiros commented on pull request #32169: URL: https://github.com/apache/spark/pull/32169#issuecomment-820395723
@srowen In `MonitorThread` this the interesting part for us: https://github.com/apache/spark/blob/4f1b687c1a6f6f0d3e0dabca0d88a78e4315f9d6/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L582-L597 So when the task is completed then this Monitor thread do nothing interesting just stopping without doing anything. Its main purpose to handle task interruptions. So when the task was interrupted before this PR then multiple `MonitorThread` were called `destroyPythonWorker` for the same socket which was delegating to https://github.com/apache/spark/blob/c4a5e2dfa38d754f92ea6f4b98f549b7d6108639/core/src/main/scala/org/apache/spark/SparkEnv.scala#L128 We know here the key is the same as the socket was the same. Going further the road we reach `stopWorker`: https://github.com/apache/spark/blob/c4a5e2dfa38d754f92ea6f4b98f549b7d6108639/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala#L344-L361 As I see `useDeamon` must be true as create only reusing sockets when the `useDaemon` is true: https://github.com/apache/spark/blob/c4a5e2dfa38d754f92ea6f4b98f549b7d6108639/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala#L102 Now back to `stopWorker`. We can see it just sends a pid. But that pid is coming from `HashMap` where the socket is the key: https://github.com/apache/spark/blob/c4a5e2dfa38d754f92ea6f4b98f549b7d6108639/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala#L83 So multiple monitor threads for the same socket will just sends the same pid multiple times via `daemon.getOutputStream` which is the stdin of the daemon process. (The taskcontext/taskAttemptID is only needed for my `runningMonitorThreads` to monitor each separate task interruptions separately.) Let's see what happens in the daemon side: https://github.com/apache/spark/blob/c4a5e2dfa38d754f92ea6f4b98f549b7d6108639/python/pyspark/daemon.py#L126-L135 So we are sending just a SIGKILL to the PID arrived via the stdin. This is really redundant for the same pid. Those errors (sending kill to a nonexisting PID) are ignored by `except OSError:`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
