HyukjinKwon opened a new pull request, #56772: URL: https://github.com/apache/spark/pull/56772
### What changes were proposed in this pull request? This PR makes a blocking read of the foreachBatch Python worker response in `StreamingPythonRunner` abortable, so that stopping a Spark Connect streaming query that uses `foreachBatch` reliably terminates even when the Python worker is wedged. - `StreamingPythonRunner.readInterruptibly(dataIn)` performs the per-batch `readInt()` under a small watchdog thread. The watchdog stops the worker once the read should be abandoned — either the calling (micro-batch) thread is interrupted, or an installed `shouldAbortRead` check trips. Stopping the worker closes its channel, which unblocks the in-flight read with an `AsynchronousCloseException` so the batch (and the query) can unwind promptly. When no abort is requested the read blocks normally, so a legitimately slow batch is never killed. - The Connect foreachBatch cleaner cache (`StreamingForeachBatchHelper.CleanerCache.registerCleanerForQuery`) installs `() => !query.isActive` as that check when it registers a query's runner cleaner. - `StreamingForeachBatchHelper` now calls `runner.readInterruptibly(dataIn)` instead of `dataIn.readInt()`. ### Why are the changes needed? `SparkConnectSessionHolderSuite` "python foreachBatch process: process terminates after query is stopped" is flaky, and the underlying behavior is a real hang. The micro-batch execution thread runs the foreachBatch function, which blocks reading the Python worker response via `Channels.newInputStream(channel).readInt()`. When the worker is wedged (e.g. it is mid re-entrant Spark Connect call), that read is broken by neither `query.stop()`'s `Thread.interrupt()` (the channel is not closed by interrupting the reader thread, and the interrupt flag is not even observed on the stream thread) nor a socket read timeout (`SO_TIMEOUT` has no effect on channel reads). So `stop()` blocks until `spark.sql.streaming.stopTimeout` elapses; with the default (infinite) timeout it hangs indefinitely. SPARK-56586 bounded the test with a stop timeout and retries, but that only turns the hang into a failure — the retries deadlock identically. `query.stop()` sets the query state to `TERMINATED` (so `isActive` becomes `false`) before it blocks waiting for the thread to die, so `() => !query.isActive` is a reliable signal that lets the watchdog break the wedged read promptly. ### Does this PR introduce _any_ user-facing change? No. Stopping a Spark Connect `foreachBatch` streaming query is now reliable where it could previously hang, but there is no API or behavior change for successful queries. ### How was this patch tested? Repeated the previously-flaky test on a fork runner: `SparkConnectSessionHolderSuite` "python foreachBatch process: process terminates after query is stopped" passed 12/12 times on the first attempt with no retries (it previously failed even after 3 retries). Root cause and the close→unblock mechanism were also confirmed with targeted diagnostics on the same path. ### Was this patch authored or co-authored using generative AI tooling? Yes, generated by Isaac. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
