juliuszsompolski commented on code in PR #44670:
URL: https://github.com/apache/spark/pull/44670#discussion_r1448658127
##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala:
##########
@@ -214,6 +214,9 @@ case class SessionHolder(userId: String, sessionId: String,
session: SparkSessio
logInfo(s"Session $key accessed, time $lastAccessTimeMs.")
}
+ // Exposed for testing.
+ private[connect] def getAccessTime(): Long = lastAccessTimeMs
Review Comment:
nit: you can use `getSessionHolderInfo.lastAccessTimeMs`
##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectReattachExecuteHandler.scala:
##########
@@ -29,25 +29,26 @@ class SparkConnectReattachExecuteHandler(
extends Logging {
def handle(v: proto.ReattachExecuteRequest): Unit = {
- val executeHolder = SparkConnectService.executionManager
- .getExecuteHolder(ExecuteKey(v.getUserContext.getUserId, v.getSessionId,
v.getOperationId))
- .getOrElse {
- if (SparkConnectService.executionManager
- .getAbandonedTombstone(
- ExecuteKey(v.getUserContext.getUserId, v.getSessionId,
v.getOperationId))
- .isDefined) {
- logDebug(s"Reattach operation abandoned: ${v.getOperationId}")
- throw new SparkSQLException(
- errorClass = "INVALID_HANDLE.OPERATION_ABANDONED",
- messageParameters = Map("handle" -> v.getOperationId))
+ val sessionHolder = SparkConnectService.sessionManager
+ .getIsolatedSession(SessionKey(v.getUserContext.getUserId,
v.getSessionId))
- } else {
- logDebug(s"Reattach operation not found: ${v.getOperationId}")
- throw new SparkSQLException(
- errorClass = "INVALID_HANDLE.OPERATION_NOT_FOUND",
- messageParameters = Map("handle" -> v.getOperationId))
- }
+ val executeHolder =
sessionHolder.executeHolder(v.getOperationId).getOrElse {
+ if (SparkConnectService.executionManager
+ .getAbandonedTombstone(
+ ExecuteKey(v.getUserContext.getUserId, v.getSessionId,
v.getOperationId))
+ .isDefined) {
+ logDebug(s"Reattach operation abandoned: ${v.getOperationId}")
+ throw new SparkSQLException(
+ errorClass = "INVALID_HANDLE.OPERATION_ABANDONED",
+ messageParameters = Map("handle" -> v.getOperationId))
+
Review Comment:
nit: no empty line.
##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectReattachExecuteHandler.scala:
##########
@@ -29,25 +29,26 @@ class SparkConnectReattachExecuteHandler(
extends Logging {
def handle(v: proto.ReattachExecuteRequest): Unit = {
- val executeHolder = SparkConnectService.executionManager
- .getExecuteHolder(ExecuteKey(v.getUserContext.getUserId, v.getSessionId,
v.getOperationId))
Review Comment:
I reviewed and traced the code to see if this change would introduce any
strange range conditions because
SparkConnectService.executionManager.getExecuteHolder was a one step process
under executionsLock, and getIsolatedSession + sessionHolder.executeHolder is a
two step process, but I believe all is good.
Even if the session/execution closed in the meanwhile, then
`executeHolder.runGrpcResponseSender` -> `addGrpcResponseSender` would handle
detecting that the execution was concurrently closed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]