[ 
https://issues.apache.org/jira/browse/SPARK-37088?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ankur Dave updated SPARK-37088:
-------------------------------
    Description: 
Python UDFs in Spark SQL are run in a separate Python process. The Python 
process is fed input by a dedicated thread (`BasePythonRunner.WriterThread`). 
This writer thread drives the child plan by pulling rows from its output 
iterator and serializing them across a socket.

When the child exec node is the off-heap vectorized Parquet reader, these rows 
are backed by off-heap memory. The child node uses a task completion listener 
to free the off-heap memory at the end of the task, which invalidates the 
output iterator and any rows it has produced. Since task completion listeners 
are registered bottom-up and executed in reverse order of registration, this is 
safe as long as an exec node never accesses its input after its task completion 
listener has executed.

The BasePythonRunner task completion listener violates this assumption. It 
interrupts the writer thread, but does not wait for it to exit. This causes a 
race condition that can lead to an executor crash:
1. The Python writer thread is processing a row backed by off-heap memory.
2. The task finishes, for example because it has reached a row limit.
3. The BasePythonRunner task completion listener sets the interrupt status of 
the writer thread, but the writer thread does not check it immediately.
4. The child plan's task completion listener frees its off-heap memory, 
invalidating the row that the Python writer thread is processing.
5. The Python writer thread attempts to access the invalidated row. The 
use-after-free triggers a segfault that crashes the executor.

https://issues.apache.org/jira/browse/SPARK-33277 describes the same issue, but 
the fix was incomplete. It did not address the situation where the Python 
writer thread accesses a freed row.

  was:
Python UDFs in Spark SQL are run in a separate Python process. The Python 
process is fed input by a dedicated thread (`BasePythonRunner.WriterThread`). 
This writer thread drives the child plan by pulling rows from its output 
iterator and serializing them across a socket.

When the child exec node is the off-heap vectorized Parquet reader, these rows 
are backed by off-heap memory. The child node uses a task completion listener 
to free the off-heap memory at the end of the task, which invalidates the 
output iterator and any rows it has produced. Since task completion listeners 
are registered bottom-up and executed in reverse order of registration, this is 
safe as long as an exec node never accesses its input after its task completion 
listener has executed.

The BasePythonRunner task completion listener violates this assumption. It 
interrupts the writer thread, but does not wait for it to exit. This causes a 
race condition that can lead to an executor crash:
1. The Python writer thread is processing a row backed by off-heap memory.
2. The task finishes, for example because it has reached a row limit.
3. The BasePythonRunner task completion listener sets the interrupt status of 
the writer thread, but the writer thread does not check it immediately.
4. The child plan's task completion listener frees its off-heap memory, 
invalidating the row that the Python writer thread is processing.
5. The Python writer thread attempts to access the invalidated row. The 
use-after-free triggers a segfault that crashes the executor.


> Python UDF after off-heap vectorized reader can cause crash due to 
> use-after-free in writer thread
> --------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-37088
>                 URL: https://issues.apache.org/jira/browse/SPARK-37088
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark, Spark Core, SQL
>    Affects Versions: 3.0.3, 3.1.2, 3.2.0
>            Reporter: Ankur Dave
>            Assignee: Ankur Dave
>            Priority: Major
>
> Python UDFs in Spark SQL are run in a separate Python process. The Python 
> process is fed input by a dedicated thread (`BasePythonRunner.WriterThread`). 
> This writer thread drives the child plan by pulling rows from its output 
> iterator and serializing them across a socket.
> When the child exec node is the off-heap vectorized Parquet reader, these 
> rows are backed by off-heap memory. The child node uses a task completion 
> listener to free the off-heap memory at the end of the task, which 
> invalidates the output iterator and any rows it has produced. Since task 
> completion listeners are registered bottom-up and executed in reverse order 
> of registration, this is safe as long as an exec node never accesses its 
> input after its task completion listener has executed.
> The BasePythonRunner task completion listener violates this assumption. It 
> interrupts the writer thread, but does not wait for it to exit. This causes a 
> race condition that can lead to an executor crash:
> 1. The Python writer thread is processing a row backed by off-heap memory.
> 2. The task finishes, for example because it has reached a row limit.
> 3. The BasePythonRunner task completion listener sets the interrupt status of 
> the writer thread, but the writer thread does not check it immediately.
> 4. The child plan's task completion listener frees its off-heap memory, 
> invalidating the row that the Python writer thread is processing.
> 5. The Python writer thread attempts to access the invalidated row. The 
> use-after-free triggers a segfault that crashes the executor.
> https://issues.apache.org/jira/browse/SPARK-33277 describes the same issue, 
> but the fix was incomplete. It did not address the situation where the Python 
> writer thread accesses a freed row.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to