HyukjinKwon opened a new pull request, #56799: URL: https://github.com/apache/spark/pull/56799
### What changes were proposed in this pull request? This PR makes `StreamingPythonRunner`'s blocking read of the Python worker response interruptible when a Spark Connect `foreachBatch` query is being stopped. `StreamingPythonRunner.readInterruptibly` guards the blocking `Channels.newInputStream(channel).readInt()` with a watchdog thread. The Connect `foreachBatch` cleaner cache installs an abort condition `() => !query.isActive` when it registers a query's cleaner. Once that abort condition has held for a short grace period (long enough for a responsive worker to finish the in-flight batch so a normal stop is undisturbed), the watchdog closes the worker channel, which unblocks a genuinely wedged read with an `AsynchronousCloseException` so the batch/query unwinds promptly. A legitimately slow batch (query still active) is never killed. ### Why are the changes needed? `SparkConnectSessionHolderSuite` — `python foreachBatch process: process terminates after query is stopped` flakily fails (and previously hung for ~150 minutes) because `query.stop()` cannot terminate a wedged micro-batch thread. The micro-batch execution thread runs the `foreachBatch` function, which blocks in `StreamingPythonRunner` reading the Python worker response. When the worker is wedged (e.g. deadlocked on a re-entrant Spark Connect call), that read is broken by neither `query.stop()`'s `Thread.interrupt()` (interrupting the reader does not close the channel, and the interrupt flag is never observed on the stream thread) nor a socket read timeout (`SO_TIMEOUT` has no effect on channel reads). So `stop()` blocks until `spark.sql.streaming.stopTimeout` elapses (default: infinite → hangs forever). The previous bound+retry only turned the hang into a failure; the retries deadlock identically. This is one of the two remaining unmerged fixes keeping the apache/spark master matrix builds red — it fails the `connect` module on the scheduled Maven JDK 25 and JDK 21 (ARM) lanes. ### Does this PR introduce _any_ user-facing change? No. The change only affects how a wedged Python worker read is aborted on query stop; a normal stop is undisturbed. ### How was this patch tested? `connect` module tests on a fork, including the previously-flaky `SparkConnectSessionHolderSuite` foreachBatch termination test (also repeated x12 in a focused workflow to deflake). - **Before (failing on apache/spark master):** `Build / Maven (Scala 2.13, JDK 25)` — `connect` module, `StreamingForeachBatchHelperSuite` foreachBatch termination test times out (`spark.sql.streaming.stopTimeout`): https://github.com/apache/spark/actions/runs/28179702547 - **After (passing on this branch):** `connect` module green on the fork — the formerly-hanging `python foreachBatch process: process terminates after query is stopped` now passes in ~5.5s (connect module: `Tests: succeeded 1612, failed 0`): https://github.com/HyukjinKwon/spark/actions/runs/28204993346/job/83555910566 ### Was this patch authored or co-authored using generative AI tooling? Yes. This pull request and its description were written by Isaac. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
