ankurdave opened a new pull request #34245: URL: https://github.com/apache/spark/pull/34245
### What changes were proposed in this pull request? Python UDFs in Spark SQL are run in a separate Python process. The Python process is fed input by a dedicated thread (`BasePythonRunner.WriterThread`). This writer thread drives the child plan by pulling rows from its output iterator and serializing them across a socket. When the child exec node is the off-heap vectorized Parquet reader, these rows are backed by off-heap memory. The child node uses a task completion listener to free the off-heap memory at the end of the task, which invalidates the output iterator and any rows it has produced. Since task completion listeners are registered bottom-up and executed in reverse order of registration, this is safe as long as an exec node never accesses its input after its task completion listener has executed.[^1] The BasePythonRunner task completion listener violates this assumption. It interrupts the writer thread, but does not wait for it to exit. This causes a race condition that can lead to an executor crash: 1. The Python writer thread is processing a row backed by off-heap memory. 2. The task finishes, for example because it has reached a row limit. 3. The BasePythonRunner task completion listener sets the interrupt status of the writer thread, but the writer thread does not check it immediately. 4. The child plan's task completion listener frees its off-heap memory, invalidating the row that the Python writer thread is processing. 5. The Python writer thread attempts to access the invalidated row. The use-after-free triggers a segfault that crashes the executor. This PR fixes the bug by making the BasePythonRunner task completion listener wait for the writer thread to exit before returning. This prevents its input from being invalidated while the thread is running. The sequence of events is now as follows: 1. The Python writer thread is processing a row backed by off-heap memory. 2. The task finishes, for example because it has reached a row limit. 3. The BasePythonRunner task completion listener sets the interrupt status of the writer thread and waits for the writer thread to exit. 4. The child plan's task completion listener can safely free its off-heap memory without invalidating live rows. [^1]: This guarantee was not historically recognized, leading to similar bugs as far back as 2014 ([SPARK-1019](https://issues.apache.org/jira/browse/SPARK-1019?focusedCommentId=13953661&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-13953661)). The root cause was the lack of a reliably-ordered mechanism for operators to free resources at the end of a task. Such a mechanism (task completion listeners) was added and gradually refined, and we can now make this guarantee explicit. ### Why are the changes needed? Without this PR, attempting to use Python UDFs while the off-heap vectorized Parquet reader is enabled (`spark.sql.columnVector.offheap.enabled true`) can cause executors to segfault. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? A [previous PR](https://github.com/apache/spark/pull/30177) reduced the likelihood of encountering this race condition, but did not eliminate it. The accompanying tests were therefore flaky and had to be disabled. This PR eliminates the race condition, allowing us to re-enable these tests. One of the tests, `test_pandas_udf_scalar`, previously failed 30/1000 times and now always succeeds. An internal workload previously failed with a segfault about 40% of the time when run with `spark.sql.columnVector.offheap.enabled true`, and now succeeds 100% of the time. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
