dylanwong250 commented on code in PR #53313:
URL: https://github.com/apache/spark/pull/53313#discussion_r2605061612
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/ChecksumCheckpointFileManager.scala:
##########
@@ -500,16 +500,11 @@ class ChecksumCancellableFSDataOutputStream(
@volatile private var closed = false
override def cancel(): Unit = {
- val mainFuture = Future {
- mainStream.cancel()
- }(uploadThreadPool)
-
- val checksumFuture = Future {
- checksumStream.cancel()
- }(uploadThreadPool)
-
- awaitResult(mainFuture, Duration.Inf)
- awaitResult(checksumFuture, Duration.Inf)
+ // Cancel both streams synchronously, because consider we want to cancel
the while the thread
+ // is in the interrupted state, If we cancel each stream using a future,
the thread will throw
+ // InterruptedException and the thread will not be cancelled.
Review Comment:
The main issue was that this method was being called when the thread was in
an interrupted state. While in this state, scheduling Futures will immediately
throw an InterruptedException. I am not sure if there are cases when cancel
itself would throw an InterruptedException, but I added a tryWithSafeFinally
block so that both cancels will always be attempted.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]