juliuszsompolski commented on code in PR #48034:
URL: https://github.com/apache/spark/pull/48034#discussion_r1751665785
##########
sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala:
##########
@@ -77,26 +77,28 @@ private[connect] class SparkConnectExecutionManager()
extends Logging {
request.getSessionId,
previousSessionId)
val executeHolder = new ExecuteHolder(request, sessionHolder)
+
+ // Check if the operation already exists, either in the active execution
map, or in the
+ // graveyard of tombstones of executions that have been abandoned. The
latter is to prevent
+ // double executions when the client retries, thinking it never reached
the server, but in
+ // fact it did, and already got removed as abandoned.
+ if (executions.putIfAbsent(executeHolder.key, executeHolder) != null) {
+ throw new SparkSQLException(
+ errorClass = "INVALID_HANDLE.OPERATION_ALREADY_EXISTS",
+ messageParameters = Map("handle" -> executeHolder.operationId))
+ } else if (getAbandonedTombstone(executeHolder.key).isDefined) {
+ executions.remove(executeHolder.key)
+ throw new SparkSQLException(
+ errorClass = "INVALID_HANDLE.OPERATION_ABANDONED",
+ messageParameters = Map("handle" -> executeHolder.operationId))
+ }
+
+ sessionHolder.addExecuteHolder(executeHolder)
+
executionsLock.synchronized {
- // Check if the operation already exists, both in active executions, and
in the graveyard
- // of tombstones of executions that have been abandoned.
- // The latter is to prevent double execution when a client retries
execution, thinking it
- // never reached the server, but in fact it did, and already got removed
as abandoned.
- if (executions.get(executeHolder.key).isDefined) {
- throw new SparkSQLException(
- errorClass = "INVALID_HANDLE.OPERATION_ALREADY_EXISTS",
- messageParameters = Map("handle" -> executeHolder.operationId))
- }
- if (getAbandonedTombstone(executeHolder.key).isDefined) {
- throw new SparkSQLException(
- errorClass = "INVALID_HANDLE.OPERATION_ABANDONED",
- messageParameters = Map("handle" -> executeHolder.operationId))
- }
- sessionHolder.addExecuteHolder(executeHolder)
- executions.put(executeHolder.key, executeHolder)
Review Comment:
For https://github.com/apache/spark/pull/48034#discussion_r1751660500 it is
important that we now first put the execution in the global map, and then into
the session map.
But, is it now possible in some operations that an execution will be
retrieved from the global map, validating that it exists, but then it will
assume that it's present in the session map but race before it's added there?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]