ankurdave opened a new pull request #34245:
URL: https://github.com/apache/spark/pull/34245


   ### What changes were proposed in this pull request?
   
   Python UDFs in Spark SQL are run in a separate Python process. The Python 
process is fed input by a dedicated thread (`BasePythonRunner.WriterThread`). 
This writer thread drives the child plan by pulling rows from its output 
iterator and serializing them across a socket.
   
   When the child exec node is the off-heap vectorized Parquet reader, these 
rows are backed by off-heap memory. The child node uses a task completion 
listener to free the off-heap memory at the end of the task, which invalidates 
the output iterator and any rows it has produced. Since task completion 
listeners are registered bottom-up and executed in reverse order of 
registration, this is safe as long as an exec node never accesses its input 
after its task completion listener has executed.[^1]
   
   The BasePythonRunner task completion listener violates this assumption. It 
interrupts the writer thread, but does not wait for it to exit. This causes a 
race condition that can lead to an executor crash:
   1. The Python writer thread is processing a row backed by off-heap memory.
   2. The task finishes, for example because it has reached a row limit.
   3. The BasePythonRunner task completion listener sets the interrupt status 
of the writer thread, but the writer thread does not check it immediately.
   4. The child plan's task completion listener frees its off-heap memory, 
invalidating the row that the Python writer thread is processing.
   5. The Python writer thread attempts to access the invalidated row. The 
use-after-free triggers a segfault that crashes the executor.
   
   This PR fixes the bug by making the BasePythonRunner task completion 
listener wait for the writer thread to exit before returning. This prevents its 
input from being invalidated while the thread is running. The sequence of 
events is now as follows:
   1. The Python writer thread is processing a row backed by off-heap memory.
   2. The task finishes, for example because it has reached a row limit.
   3. The BasePythonRunner task completion listener sets the interrupt status 
of the writer thread and waits for the writer thread to exit.
   4. The child plan's task completion listener can safely free its off-heap 
memory without invalidating live rows.
   
   [^1]: This guarantee was not historically recognized, leading to similar 
bugs as far back as 2014 
([SPARK-1019](https://issues.apache.org/jira/browse/SPARK-1019?focusedCommentId=13953661&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-13953661)).
 The root cause was the lack of a reliably-ordered mechanism for operators to 
free resources at the end of a task. Such a mechanism (task completion 
listeners) was added and gradually refined, and we can now make this guarantee 
explicit.
   
   ### Why are the changes needed?
   
   Without this PR, attempting to use Python UDFs while the off-heap vectorized 
Parquet reader is enabled (`spark.sql.columnVector.offheap.enabled true`) can 
cause executors to segfault.
   
   ### Does this PR introduce _any_ user-facing change?
   
   No.
   
   ### How was this patch tested?
   
   A [previous PR](https://github.com/apache/spark/pull/30177) reduced the 
likelihood of encountering this race condition, but did not eliminate it. The 
accompanying tests were therefore flaky and had to be disabled. This PR 
eliminates the race condition, allowing us to re-enable these tests. One of the 
tests, `test_pandas_udf_scalar`, previously failed 30/1000 times and now always 
succeeds.
   
   An internal workload previously failed with a segfault about 40% of the time 
when run with `spark.sql.columnVector.offheap.enabled true`, and now succeeds 
100% of the time.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to