changgyoopark-db commented on code in PR #48208:
URL: https://github.com/apache/spark/pull/48208#discussion_r1773222971
##########
sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteThreadRunner.scala:
##########
@@ -32,76 +32,73 @@ import org.apache.spark.sql.connect.common.ProtoUtils
import org.apache.spark.sql.connect.planner.SparkConnectPlanner
import org.apache.spark.sql.connect.service.{ExecuteHolder, ExecuteSessionTag,
SparkConnectService}
import org.apache.spark.sql.connect.utils.ErrorUtils
-import org.apache.spark.util.{ThreadUtils, Utils}
+import org.apache.spark.util.Utils
/**
* This class launches the actual execution in an execution thread. The
execution pushes the
* responses to a ExecuteResponseObserver in executeHolder.
*/
private[connect] class ExecuteThreadRunner(executeHolder: ExecuteHolder)
extends Logging {
- private val promise: Promise[Unit] = Promise[Unit]()
+ /** The thread state. */
+ private val state: AtomicInteger = new AtomicInteger(ThreadState.notStarted)
// The newly created thread will inherit all InheritableThreadLocals used by
Spark,
// e.g. SparkContext.localProperties. If considering implementing a
thread-pool,
// forwarding of thread locals needs to be taken into account.
- private val executionThread: ExecutionThread = new ExecutionThread(promise)
-
- private var started: Boolean = false
-
- private var interrupted: Boolean = false
-
- private var completed: Boolean = false
-
- private val lock = new Object
+ private val executionThread: ExecutionThread = new ExecutionThread()
/** Launches the execution in a background thread, returns immediately. */
private[connect] def start(): Unit = {
- lock.synchronized {
- assert(!started)
- // Do not start if already interrupted.
- if (!interrupted) {
- executionThread.start()
- started = true
- }
- }
- }
+ if (state.getAcquire() == ThreadState.notStarted) {
+ executionThread.start()
- /**
- * Register a callback that gets executed after completion/interruption of
the execution thread.
- */
- private[connect] def processOnCompletion(callback: Try[Unit] => Unit): Unit
= {
-
promise.future.onComplete(callback)(ExecuteThreadRunner.namedExecutionContext)
+ // If the thread is started earlier than this, the thread will change
the state itself.
+ state.compareAndExchangeRelease(ThreadState.notStarted,
ThreadState.started)
+ }
}
/**
- * Interrupt the executing thread.
+ * Interrupts the execution thread if the thread is running and has yet to
be completed.
+ *
* @return
- * true if it was not interrupted before, false if it was already
interrupted or completed.
+ * true if the thread is running and interrupted.
*/
private[connect] def interrupt(): Boolean = {
- lock.synchronized {
- if (!started && !interrupted) {
- // execution thread hasn't started yet, and will not be started.
- // handle the interrupted error here directly.
- interrupted = true
- ErrorUtils.handleError(
- "execute",
- executeHolder.responseObserver,
- executeHolder.sessionHolder.userId,
- executeHolder.sessionHolder.sessionId,
- Some(executeHolder.eventsManager),
- interrupted)(new SparkSQLException("OPERATION_CANCELED", Map.empty))
- true
- } else if (!interrupted && !completed) {
- // checking completed prevents sending interrupt onError after
onCompleted
- interrupted = true
- executionThread.interrupt()
- true
+ var currentState = state.getAcquire()
+ while (currentState == ThreadState.notStarted || currentState ==
ThreadState.started) {
+ val newState = if (currentState == ThreadState.notStarted) {
+ ThreadState.interrupted
} else {
- false
+ ThreadState.startedInterrupted
}
+
+ val prevState = state.compareAndExchangeRelease(currentState, newState)
+ if (prevState == currentState) {
+ if (prevState == ThreadState.notStarted) {
+ // The execution thread has not been started, and will never be
started.
Review Comment:
You're right, the thread will silently exit; need to update the comment.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]