st-tran opened a new pull request, #52915: URL: https://github.com/apache/spark/pull/52915
### What changes were proposed in this pull request? This diff addresses the synchronization issue described in SPARK-54217 by respective the existing releasedOrClosed AtomicBoolean in the PythonRunner's kill codepath, which is currently only used in the "released" codepath - not the "closed" one. In doing so, we avoid erroneously destroying a still-healthy Python worker; in the current state, it will be destroyed & a new one will be created. [Jira ticket description follows...](https://issues.apache.org/jira/browse/SPARK-54217) [PythonWorkerFactory](https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala) in daemon mode will allow for worker reuse, where possible, as long as the worker successfully completed its last-assigned task (via [releasePythonWorker](https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L650)). The worker will be released into the idle queue to be picked up by the next [createPythonWorker](https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L302) call. However, there is a race condition that can result in a released worker in the [PythonWorkerFactory idle queue](https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala#L116) getting killed. i.e. the `PythonRunner` lacks synchronization between: 1. the main task thread's decision to release its associated Python worker (when work is complete), and 2. the `MonitorThread`'s decision to kill the associated Python worker (when requested by the executor, e.g. speculative execution where another attempt succeeds). So, the following sequence of events is possible: 1. `PythonRunner` is running 2. The Python worker finishes its work and writes `END_OF_STREAM` to signal back to `PythonRunner`'s main task thread that it is done 3. [`PythonRunner`'s main task thread receives this instruction and releases the worker for reuse](https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L647) 4. For a separate reason: Executor decides to kill this task (e.g. speculative execution) 5. [`PythonRunner`'s `MonitorThread` receives this instruction and kills the already-relinquished `PythonWorker`](https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L696) So the next task that pulls this Python worker from the idle pool will have a dead Python worker. ### Why are the changes needed? In the latest Spark release, this change is NOT critical, however, it avoids the unnecessary killing of a still-healthy Python worker which results in another one being created. * Prior to [SPARK-47565](https://issues.apache.org/jira/browse/SPARK-47565), this would result in a crash (failing the task) as we would reuse this now-closed worker. * With [SPARK-47565](https://issues.apache.org/jira/browse/SPARK-47565), this is less of an issue, as we check that the worker is alive before we use it. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? <!-- If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible. If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future. If tests were not added, please describe why they were not added and/or why it was difficult to add. If benchmark tests were added, please run the benchmarks in GitHub Actions for the consistent environment, and the instructions could accord to: https://spark.apache.org/developer-tools.html#github-workflow-benchmarks. --> It is not possible (or very hard) to unit test this change. However, I've created a minimal repro of the issue. The error occurs when running without this change, and goes away with this change. * Without this change: details in SPARK-54217; we see the following log line indicating that we are attempting to reuse killed Python workers: * `PythonWorker(java.nio.channels.SocketChannel[closed]) process from idle queue is dead, discarding.` * With this change: * First, cherry-pick [this PR](https://github.com/st-tran/spark/pull/1) onto this fix. * Then, rerun the job with this new build and we see that the errors go away: ``` sttran@Stevens-MacBook-Pro ~/sttran-spark/work % ls -lathr total 0 drwxr-xr-x@ 18 sttran staff 576B Nov 5 21:22 app-20251105212207-0012 # Without the fix (has errors shown in https://github.com/st-tran/spark/pull/1) drwxr-xr-x@ 53 sttran staff 1.7K Nov 5 22:22 .. drwxr-xr-x@ 4 sttran staff 128B Nov 5 22:29 . drwxr-xr-x@ 18 sttran staff 576B Nov 5 22:29 app-20251105222956-0013 # With the fix (no errors per search below) sttran@Stevens-MacBook-Pro ~/sttran-spark/work % grep -nri discard *0013 sttran@Stevens-MacBook-Pro ~/sttran-spark/work % ``` ### Was this patch authored or co-authored using generative AI tooling? No. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
