changgyoopark-db commented on code in PR #48034:
URL: https://github.com/apache/spark/pull/48034#discussion_r1751619058


##########
sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala:
##########
@@ -108,43 +110,50 @@ private[connect] class SparkConnectExecutionManager() 
extends Logging {
    * execution if still running, free all resources.
    */
   private[connect] def removeExecuteHolder(key: ExecuteKey, abandoned: Boolean 
= false): Unit = {
-    var executeHolder: Option[ExecuteHolder] = None
+    val executeHolder = executions.get(key)
+
+    if (executeHolder == null) {
+      return
+    }
+
+    // Put into abandonedTombstones before removing it from executions, so 
that the client ends up
+    // getting an INVALID_HANDLE.OPERATION_ABANDONED error on a retry.
+    if (abandoned) {
+      abandonedTombstones.put(key, executeHolder.getExecuteInfo)
+    }
+
+    // Remove the execution from the map *after* putting it in 
abandonedTombstones.
+    executions.remove(key)
+    executeHolder.sessionHolder.removeExecuteHolder(executeHolder.operationId)
+
     executionsLock.synchronized {
-      executeHolder = executions.remove(key)
-      executeHolder.foreach { e =>
-        // Put into abandonedTombstones under lock, so that if it's accessed 
it will end up
-        // with INVALID_HANDLE.OPERATION_ABANDONED error.
-        if (abandoned) {
-          abandonedTombstones.put(key, e.getExecuteInfo)
-        }
-        e.sessionHolder.removeExecuteHolder(e.operationId)
-      }
       if (executions.isEmpty) {
         lastExecutionTimeMs = Some(System.currentTimeMillis())
       }
-      logInfo(log"ExecuteHolder ${MDC(LogKeys.EXECUTE_KEY, key)} is removed.")
     }
-    // close the execution outside the lock
-    executeHolder.foreach { e =>
-      e.close()
-      if (abandoned) {
-        // Update in abandonedTombstones: above it wasn't yet updated with 
closedTime etc.
-        abandonedTombstones.put(key, e.getExecuteInfo)
-      }
+
+    logInfo(log"ExecuteHolder ${MDC(LogKeys.EXECUTE_KEY, key)} is removed.")
+
+    executeHolder.close()
+    if (abandoned) {
+      // Update in abandonedTombstones: above it wasn't yet updated with 
closedTime etc.
+      abandonedTombstones.put(key, executeHolder.getExecuteInfo)
     }
   }
 
   private[connect] def getExecuteHolder(key: ExecuteKey): 
Option[ExecuteHolder] = {
-    executionsLock.synchronized {
-      executions.get(key)
-    }
+    Option(executions.get(key))
   }
 
   private[connect] def removeAllExecutionsForSession(key: SessionKey): Unit = {
-    val sessionExecutionHolders = executionsLock.synchronized {
-      executions.filter(_._2.sessionHolder.key == key)
-    }
-    sessionExecutionHolders.foreach { case (_, executeHolder) =>
+    var sessionExecutionHolders = mutable.ArrayBuffer[ExecuteHolder]()
+    executions.forEach((_, executeHolder) => {
+      if (executeHolder.sessionHolder.key == key) {
+        sessionExecutionHolders += executeHolder
+      }
+    })
+
+    sessionExecutionHolders.foreach { executeHolder =>

Review Comment:
   Ah, that's a valid point, thanks! So, the following execution sequence will 
be really problematic.
   
   Thread 1: check closedTimeMs -> None.
   Thread 2: set closedTimeMs.
   Thread 2: removeAllExecutionsForSession.
   Thread 1: add the executeHolder to the map.
   
   Furthermore, closedTimeMs is a plain Option, which is *semantically* not 
quite thread-safe (it will be 16-byte long so if not perfectly aligned in 
memory, it is unsafe).
   
   I'll try to fix those issues in this PR too!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to