hvanhovell commented on code in PR #48071:
URL: https://github.com/apache/spark/pull/48071#discussion_r1756976955
##########
sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala:
##########
@@ -138,39 +137,38 @@ case class SessionHolder(userId: String, sessionId:
String, session: SparkSessio
}
/**
- * Add ExecuteHolder to this session.
+ * Add an operation ID to this session.
*
- * Called only by SparkConnectExecutionManager under executionsLock.
+ * Called only by SparkConnectExecutionManager when a new execution is
started.
*/
- @GuardedBy("SparkConnectService.executionManager.executionsLock")
- private[service] def addExecuteHolder(executeHolder: ExecuteHolder): Unit = {
+ private[service] def addOperationId(operationId: String): Unit = {
if (closedTimeMs.isDefined) {
// Do not accept new executions if the session is closing.
throw new SparkSQLException(
errorClass = "INVALID_HANDLE.SESSION_CLOSED",
messageParameters = Map("handle" -> sessionId))
}
- val oldExecute = executions.putIfAbsent(executeHolder.operationId,
executeHolder)
- if (oldExecute != null) {
- // the existence of this should alrady be checked by
SparkConnectExecutionManager
- throw new IllegalStateException(
- s"ExecuteHolder with opId=${executeHolder.operationId} already
exists!")
+ var existed = true
+ operationIds.computeIfAbsent(
Review Comment:
How about this:
```scala
val alreadyExists = operationIds.putIfAbsent(operationId, ()) != null
```
That IMO is simpler and more readable.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]