HyukjinKwon opened a new pull request, #56799:
URL: https://github.com/apache/spark/pull/56799

   ### What changes were proposed in this pull request?
   
   This PR makes `StreamingPythonRunner`'s blocking read of the Python worker 
response interruptible when a Spark Connect `foreachBatch` query is being 
stopped.
   
   `StreamingPythonRunner.readInterruptibly` guards the blocking 
`Channels.newInputStream(channel).readInt()` with a watchdog thread. The 
Connect `foreachBatch` cleaner cache installs an abort condition `() => 
!query.isActive` when it registers a query's cleaner. Once that abort condition 
has held for a short grace period (long enough for a responsive worker to 
finish the in-flight batch so a normal stop is undisturbed), the watchdog 
closes the worker channel, which unblocks a genuinely wedged read with an 
`AsynchronousCloseException` so the batch/query unwinds promptly. A 
legitimately slow batch (query still active) is never killed.
   
   ### Why are the changes needed?
   
   `SparkConnectSessionHolderSuite` — `python foreachBatch process: process 
terminates after query is stopped` flakily fails (and previously hung for ~150 
minutes) because `query.stop()` cannot terminate a wedged micro-batch thread.
   
   The micro-batch execution thread runs the `foreachBatch` function, which 
blocks in `StreamingPythonRunner` reading the Python worker response. When the 
worker is wedged (e.g. deadlocked on a re-entrant Spark Connect call), that 
read is broken by neither `query.stop()`'s `Thread.interrupt()` (interrupting 
the reader does not close the channel, and the interrupt flag is never observed 
on the stream thread) nor a socket read timeout (`SO_TIMEOUT` has no effect on 
channel reads). So `stop()` blocks until `spark.sql.streaming.stopTimeout` 
elapses (default: infinite → hangs forever). The previous bound+retry only 
turned the hang into a failure; the retries deadlock identically.
   
   This is one of the two remaining unmerged fixes keeping the apache/spark 
master matrix builds red — it fails the `connect` module on the scheduled Maven 
JDK 25 and JDK 21 (ARM) lanes.
   
   ### Does this PR introduce _any_ user-facing change?
   
   No. The change only affects how a wedged Python worker read is aborted on 
query stop; a normal stop is undisturbed.
   
   ### How was this patch tested?
   
   `connect` module tests on a fork, including the previously-flaky 
`SparkConnectSessionHolderSuite` foreachBatch termination test (also repeated 
x12 in a focused workflow to deflake).
   
   - **Before (failing on apache/spark master):** `Build / Maven (Scala 2.13, 
JDK 25)` — `connect` module, `StreamingForeachBatchHelperSuite` foreachBatch 
termination test times out (`spark.sql.streaming.stopTimeout`): 
https://github.com/apache/spark/actions/runs/28179702547
   - **After (passing on this branch):** `connect` module green on the fork — 
the formerly-hanging `python foreachBatch process: process terminates after 
query is stopped` now passes in ~5.5s (connect module: `Tests: succeeded 1612, 
failed 0`): 
https://github.com/HyukjinKwon/spark/actions/runs/28204993346/job/83555910566
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   Yes.
   
   This pull request and its description were written by Isaac.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to