ankurdave opened a new pull request, #36065: URL: https://github.com/apache/spark/pull/36065
### What changes were proposed in this pull request? When calling a Python UDF on a DataFrame with large rows, a deadlock can occur involving the following three threads: 1. The Scala task executor thread. During task execution, this is responsible for reading output produced by the Python process. However, in this case the task has finished early, and this thread is no longer reading output produced by the Python process. Instead, it is waiting for the Scala WriterThread to exit so that it can finish the task. 2. The Scala WriterThread. This is trying to send a large row to the Python process, and is waiting for the Python process to read that row. 3. The Python process. This is trying to send a large output to the Scala task executor thread, and is waiting for that thread to read that output. We considered the following three solutions for the deadlock: 1. When the task completes, make the Scala task executor thread close the socket before waiting for the Scala WriterThread to exit. If the WriterThread is blocked on a large write, this would interrupt that write and allow the WriterThread to exit. However, it would prevent Python worker reuse. 2. Modify PythonWorkerFactory to use interruptible I/O. [java.nio.channels.SocketChannel](https://docs.oracle.com/javase/6/docs/api/java/nio/channels/SocketChannel.html#write(java.nio.ByteBuffer)) supports interruptible blocking operations. The goal is that when the WriterThread is interrupted, it should exit even if it was blocked on a large write. However, this would be invasive. 3. Modify the existing PythonRunner.MonitorThread to detect this deadlock and kill the Python worker. The MonitorThread currently kills the Python worker only if the task itself is interrupted. In this case, the task completes normally, so the MonitorThread does not take action. We want the MonitorThread to detect that the task is completed but the Python writer thread has not stopped, indicating a deadlock. This PR implements Option 3. ### Why are the changes needed? To fix a deadlock that can cause PySpark queries to hang. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Added a test that previously encountered the deadlock and timed out, and now succeeds. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
