HyukjinKwon opened a new pull request, #56772:
URL: https://github.com/apache/spark/pull/56772

   ### What changes were proposed in this pull request?
   
   This PR makes a blocking read of the foreachBatch Python worker response in 
`StreamingPythonRunner` abortable, so that stopping a Spark Connect streaming 
query that uses `foreachBatch` reliably terminates even when the Python worker 
is wedged.
   
   - `StreamingPythonRunner.readInterruptibly(dataIn)` performs the per-batch 
`readInt()` under a small watchdog thread. The watchdog stops the worker once 
the read should be abandoned — either the calling (micro-batch) thread is 
interrupted, or an installed `shouldAbortRead` check trips. Stopping the worker 
closes its channel, which unblocks the in-flight read with an 
`AsynchronousCloseException` so the batch (and the query) can unwind promptly. 
When no abort is requested the read blocks normally, so a legitimately slow 
batch is never killed.
   - The Connect foreachBatch cleaner cache 
(`StreamingForeachBatchHelper.CleanerCache.registerCleanerForQuery`) installs 
`() => !query.isActive` as that check when it registers a query's runner 
cleaner.
   - `StreamingForeachBatchHelper` now calls `runner.readInterruptibly(dataIn)` 
instead of `dataIn.readInt()`.
   
   ### Why are the changes needed?
   
   `SparkConnectSessionHolderSuite` "python foreachBatch process: process 
terminates after query is stopped" is flaky, and the underlying behavior is a 
real hang.
   
   The micro-batch execution thread runs the foreachBatch function, which 
blocks reading the Python worker response via 
`Channels.newInputStream(channel).readInt()`. When the worker is wedged (e.g. 
it is mid re-entrant Spark Connect call), that read is broken by neither 
`query.stop()`'s `Thread.interrupt()` (the channel is not closed by 
interrupting the reader thread, and the interrupt flag is not even observed on 
the stream thread) nor a socket read timeout (`SO_TIMEOUT` has no effect on 
channel reads). So `stop()` blocks until `spark.sql.streaming.stopTimeout` 
elapses; with the default (infinite) timeout it hangs indefinitely. SPARK-56586 
bounded the test with a stop timeout and retries, but that only turns the hang 
into a failure — the retries deadlock identically.
   
   `query.stop()` sets the query state to `TERMINATED` (so `isActive` becomes 
`false`) before it blocks waiting for the thread to die, so `() => 
!query.isActive` is a reliable signal that lets the watchdog break the wedged 
read promptly.
   
   ### Does this PR introduce _any_ user-facing change?
   
   No. Stopping a Spark Connect `foreachBatch` streaming query is now reliable 
where it could previously hang, but there is no API or behavior change for 
successful queries.
   
   ### How was this patch tested?
   
   Repeated the previously-flaky test on a fork runner: 
`SparkConnectSessionHolderSuite` "python foreachBatch process: process 
terminates after query is stopped" passed 12/12 times on the first attempt with 
no retries (it previously failed even after 3 retries). Root cause and the 
close→unblock mechanism were also confirmed with targeted diagnostics on the 
same path.
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   Yes, generated by Isaac.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to