changgyoopark-db commented on code in PR #48208:
URL: https://github.com/apache/spark/pull/48208#discussion_r1773212180
##########
sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteThreadRunner.scala:
##########
@@ -226,17 +243,13 @@ private[connect] class ExecuteThreadRunner(executeHolder:
ExecuteHolder) extends
observedMetrics ++ accumulatedInPython))
}
- lock.synchronized {
- // Synchronized before sending ResultComplete, and up until completing
the result stream
- // to prevent a situation in which a client of reattachable execution
receives
- // ResultComplete, and proceeds to send ReleaseExecute, and that
triggers an interrupt
- // before it finishes.
-
- if (interrupted) {
- // check if it got interrupted at the very last moment
- throw new InterruptedException()
- }
- completed = true // no longer interruptible
+ // State transition should be atomic to prevent a situation in which a
client of reattachable
+ // execution receives ResultComplete, and proceeds to send
ReleaseExecute, and that triggers
+ // an interrupt before it finishes.
+ if (state.compareAndExchangeRelease(
+ ThreadState.started,
+ ThreadState.completed) == ThreadState.started) {
+ // Now, the execution cannot be interrupted.
Review Comment:
Exactly. I'll add more comments.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]