changgyoopark-db commented on code in PR #48034:
URL: https://github.com/apache/spark/pull/48034#discussion_r1754501627
##########
sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala:
##########
@@ -77,26 +77,28 @@ private[connect] class SparkConnectExecutionManager()
extends Logging {
request.getSessionId,
previousSessionId)
val executeHolder = new ExecuteHolder(request, sessionHolder)
+
+ // Check if the operation already exists, either in the active execution
map, or in the
+ // graveyard of tombstones of executions that have been abandoned. The
latter is to prevent
+ // double executions when the client retries, thinking it never reached
the server, but in
+ // fact it did, and already got removed as abandoned.
+ if (executions.putIfAbsent(executeHolder.key, executeHolder) != null) {
+ throw new SparkSQLException(
+ errorClass = "INVALID_HANDLE.OPERATION_ALREADY_EXISTS",
+ messageParameters = Map("handle" -> executeHolder.operationId))
+ } else if (getAbandonedTombstone(executeHolder.key).isDefined) {
+ executions.remove(executeHolder.key)
+ throw new SparkSQLException(
+ errorClass = "INVALID_HANDLE.OPERATION_ABANDONED",
+ messageParameters = Map("handle" -> executeHolder.operationId))
+ }
+
+ sessionHolder.addExecuteHolder(executeHolder)
+
executionsLock.synchronized {
- // Check if the operation already exists, both in active executions, and
in the graveyard
- // of tombstones of executions that have been abandoned.
- // The latter is to prevent double execution when a client retries
execution, thinking it
- // never reached the server, but in fact it did, and already got removed
as abandoned.
- if (executions.get(executeHolder.key).isDefined) {
- throw new SparkSQLException(
- errorClass = "INVALID_HANDLE.OPERATION_ALREADY_EXISTS",
- messageParameters = Map("handle" -> executeHolder.operationId))
- }
- if (getAbandonedTombstone(executeHolder.key).isDefined) {
- throw new SparkSQLException(
- errorClass = "INVALID_HANDLE.OPERATION_ABANDONED",
- messageParameters = Map("handle" -> executeHolder.operationId))
- }
- sessionHolder.addExecuteHolder(executeHolder)
- executions.put(executeHolder.key, executeHolder)
Review Comment:
-> https://github.com/apache/spark/pull/48071
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]