st-tran opened a new pull request, #52915:
URL: https://github.com/apache/spark/pull/52915

   ### What changes were proposed in this pull request?
   This diff addresses the synchronization issue described in SPARK-54217 by 
respective the existing releasedOrClosed AtomicBoolean in the PythonRunner's 
kill codepath, which is currently only used in the "released" codepath - not 
the "closed" one. In doing so, we avoid erroneously destroying a still-healthy 
Python worker; in the current state, it will be destroyed & a new one will be 
created.
   
   [Jira ticket description 
follows...](https://issues.apache.org/jira/browse/SPARK-54217)
   
   
[PythonWorkerFactory](https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala)
 in daemon mode will allow for worker reuse, where possible, as long as the 
worker successfully completed its last-assigned task (via 
[releasePythonWorker](https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L650)).
 The worker will be released into the idle queue to be picked up by the next 
[createPythonWorker](https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L302)
 call.
   
   However, there is a race condition that can result in a released worker in 
the [PythonWorkerFactory idle 
queue](https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala#L116)
 getting killed. i.e. the `PythonRunner` lacks synchronization between:
   1. the main task thread's decision to release its associated Python worker 
(when work is complete), and
   2. the `MonitorThread`'s decision to kill the associated Python worker (when 
requested by the executor, e.g. speculative execution where another attempt 
succeeds).
   
   So, the following sequence of events is possible:
   1. `PythonRunner` is running
   2. The Python worker finishes its work and writes `END_OF_STREAM` to signal 
back to `PythonRunner`'s main task thread that it is done
   3. [`PythonRunner`'s main task thread receives this instruction and releases 
the worker for 
reuse](https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L647)
   4. For a separate reason: Executor decides to kill this task (e.g. 
speculative execution)
   5. [`PythonRunner`'s `MonitorThread` receives this instruction and kills the 
already-relinquished 
`PythonWorker`](https://github.com/apache/spark/blob/4202f239c45a290e340bcd505de849d876f992fa/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L696)
   
   So the next task that pulls this Python worker from the idle pool will have 
a dead Python worker.
   
   ### Why are the changes needed?
   In the latest Spark release, this change is NOT critical, however, it avoids 
the unnecessary killing of a still-healthy Python worker which results in 
another one being created.
   
   * Prior to [SPARK-47565](https://issues.apache.org/jira/browse/SPARK-47565), 
this would result in a crash (failing the task) as we would reuse this 
now-closed worker.
   * With [SPARK-47565](https://issues.apache.org/jira/browse/SPARK-47565), 
this is less of an issue, as we check that the worker is alive before we use it.
   
   
   ### Does this PR introduce _any_ user-facing change?
   No.
   
   
   ### How was this patch tested?
   <!--
   If tests were added, say they were added here. Please make sure to add some 
test cases that check the changes thoroughly including negative and positive 
cases if possible.
   If it was tested in a way different from regular unit tests, please clarify 
how you tested step by step, ideally copy and paste-able, so that other 
reviewers can test and check, and descendants can verify in the future.
   If tests were not added, please describe why they were not added and/or why 
it was difficult to add.
   If benchmark tests were added, please run the benchmarks in GitHub Actions 
for the consistent environment, and the instructions could accord to: 
https://spark.apache.org/developer-tools.html#github-workflow-benchmarks.
   -->
   
   It is not possible (or very hard) to unit test this change.
   
   However, I've created a minimal repro of the issue. The error occurs when 
running without this change, and goes away with this change.
   * Without this change: details in SPARK-54217; we see the following log line 
indicating that we are attempting to reuse killed Python workers:
     * `PythonWorker(java.nio.channels.SocketChannel[closed]) process from idle 
queue is dead, discarding.`
   * With this change:
     * First, cherry-pick [this PR](https://github.com/st-tran/spark/pull/1) 
onto this fix.
     * Then, rerun the job with this new build and we see that the errors go 
away:
   
   ```
   sttran@Stevens-MacBook-Pro ~/sttran-spark/work
    % ls -lathr
   total 0
   drwxr-xr-x@ 18 sttran  staff   576B Nov  5 21:22 app-20251105212207-0012  # 
Without the fix (has errors shown in https://github.com/st-tran/spark/pull/1)
   drwxr-xr-x@ 53 sttran  staff   1.7K Nov  5 22:22 ..
   drwxr-xr-x@  4 sttran  staff   128B Nov  5 22:29 .
   drwxr-xr-x@ 18 sttran  staff   576B Nov  5 22:29 app-20251105222956-0013  # 
With the fix (no errors per search below)
   sttran@Stevens-MacBook-Pro ~/sttran-spark/work
    % grep -nri discard *0013
   sttran@Stevens-MacBook-Pro ~/sttran-spark/work
    %
   ```
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   No.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to