HyukjinKwon opened a new pull request, #36589: URL: https://github.com/apache/spark/pull/36589
### What changes were proposed in this pull request? This PR proposes to make the `foreachBatch` streaming query stop gracefully by handling the interrupted exceptions at `StreamExecution.isInterruptionException`. Because there is no straightforward way to access to the original JVM exception, here we rely on string pattern match for now (see also "Why are the changes needed?" below). There is only one place from Py4J https://github.com/py4j/py4j/blob/master/py4j-python/src/py4j/protocol.py#L326-L328 so the approach would work at least. ### Why are the changes needed? In `foreachBatch`, the Python user-defined function in the microbatch runs till the end even when `StreamingQuery.stop` is invoked. However, when any Py4J access is attempted within the user-defined function: - With the pinned thread mode disabled, the interrupt exception is not blocked, and the Python function is executed till the end in a different thread. - With the pinned thread mode enabled, the interrupt exception is raised in the same thread, and the Python thread raises a Py4J exception in the same thread. The latter case is a problem because the interrupt exception is first thrown from JVM side `java.lang. InterruptedException` -> Python callback (`py4j.protocol.Py4JJavaError`) server -> JVM `py4j.Py4JException`, and `py4j.Py4JException` is not listed in `StreamExecution.isInterruptionException` which makes the query gracefully stops. ### Does this PR introduce _any_ user-facing change? Yes, it will make the query gracefully stop when the query is stopped. ### How was this patch tested? Manually tested with: ```python import time def func(batch_df, batch_id): time.sleep(10) print(batch_df.count()) q = spark.readStream.format("rate").load().writeStream.foreachBatch(func).start() time.sleep(5) q.stop() ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
