changgyoopark-db commented on code in PR #48208:
URL: https://github.com/apache/spark/pull/48208#discussion_r1773214868
##########
sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteThreadRunner.scala:
##########
@@ -139,23 +129,50 @@ private[connect] class ExecuteThreadRunner(executeHolder:
ExecuteHolder) extends
}
}
} catch {
- ErrorUtils.handleError(
- "execute",
- executeHolder.responseObserver,
- executeHolder.sessionHolder.userId,
- executeHolder.sessionHolder.sessionId,
- Some(executeHolder.eventsManager),
- interrupted)
+ case e: Throwable if state.getAcquire() !=
ThreadState.startedInterrupted =>
+ ErrorUtils.handleError(
+ "execute",
+ executeHolder.responseObserver,
+ executeHolder.sessionHolder.userId,
+ executeHolder.sessionHolder.sessionId,
+ Some(executeHolder.eventsManager),
+ false)(e)
+ } finally {
+ // Make sure to transition to completed in order to prevent the thread
from being interrupted
+ // afterwards.
+ var currentState = state.getAcquire()
+ while (currentState == ThreadState.started ||
+ currentState == ThreadState.startedInterrupted) {
+ val interrupted = currentState == ThreadState.startedInterrupted
+ val prevState = state.compareAndExchangeRelease(currentState,
ThreadState.completed)
+ if (prevState == currentState) {
+ if (interrupted) {
+ try {
+ ErrorUtils.handleError(
+ "execute",
+ executeHolder.responseObserver,
+ executeHolder.sessionHolder.userId,
+ executeHolder.sessionHolder.sessionId,
+ Some(executeHolder.eventsManager),
+ true)(new SparkSQLException("OPERATION_CANCELED", Map.empty))
+ } finally {
+ executeHolder.cleanup()
+ }
+ }
+ return
+ }
+ currentState = prevState
+ }
}
}
// Inner executeInternal is wrapped by execute() for error handling.
- private def executeInternal() = {
- // synchronized - check if already got interrupted while starting.
- lock.synchronized {
- if (interrupted) {
- throw new InterruptedException()
- }
+ private def executeInternal(): Unit = {
+ val prevState = state.compareAndExchangeRelease(ThreadState.notStarted,
ThreadState.started)
+ if (prevState != ThreadState.notStarted && prevState !=
ThreadState.started) {
+ // Silently return, expecting that the caller would handle the
interruption.
+ assert(prevState != ThreadState.completed)
Review Comment:
If we remove the state transition in `start`, then you're right. I'll make
it clearer.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]