This is an automated email from the ASF dual-hosted git repository. hvanhovell pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 84dbe846b30 [SPARK-44625][CONNECT] SparkConnectExecutionManager to track all executions 84dbe846b30 is described below commit 84dbe846b30d5250169b834b182779a104570888 Author: Juliusz Sompolski <ju...@databricks.com> AuthorDate: Fri Aug 11 18:32:07 2023 +0200 [SPARK-44625][CONNECT] SparkConnectExecutionManager to track all executions ### What changes were proposed in this pull request? SparkConnectExecutionManager tracks all executions (ExecuteHolder) in all sessions of Spark Connect. It tracks which executions have RPCs (ExecuteGrpcReponseSender) attached to them. If an execution gets abandoned (it's not cleared with ReleaseExecute by the client, but no new RPC arrives), it will be automatically interrupted and removed after a timeout. Note for the failure: ``` Error: Field "2" on message "ReleaseExecuteResponse" moved from outside to inside a oneof. Error: buf found 1 breaking changes. ``` The message ReleaseExecuteResponse has not been released yet, so it's not a breaking change compared to any released version. ### Why are the changes needed? Need the SparkConnectExecutionManager to track reattachable executions that got abandoned by client. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Pending, but the SparkConnectExecutionManager allows tests to inspect execution state, so allows writing more unit tests about reattachable execution. Closes #42423 from juliuszsompolski/SPARK-44625. Authored-by: Juliusz Sompolski <ju...@databricks.com> Signed-off-by: Herman van Hovell <her...@databricks.com> --- .../src/main/resources/error/error-classes.json | 5 + .../src/main/protobuf/spark/connect/base.proto | 8 +- .../apache/spark/sql/connect/config/Connect.scala | 24 +++ .../execution/ExecuteGrpcResponseSender.scala | 119 ++++++------ .../execution/ExecuteResponseObserver.scala | 9 +- .../spark/sql/connect/service/ExecuteHolder.scala | 143 +++++++++++--- .../spark/sql/connect/service/SessionHolder.scala | 39 ++-- .../service/SparkConnectExecutePlanHandler.scala | 19 +- .../service/SparkConnectExecutionManager.scala | 209 +++++++++++++++++++++ .../SparkConnectReattachExecuteHandler.scala | 31 ++- .../SparkConnectReleaseExecuteHandler.scala | 51 ++--- .../sql/connect/service/SparkConnectService.scala | 11 ++ .../spark/sql/connect/utils/ErrorUtils.scala | 15 +- .../connect/planner/SparkConnectPlannerSuite.scala | 5 +- ...-error-conditions-invalid-handle-error-class.md | 4 + python/pyspark/sql/connect/proto/base_pb2.py | 8 +- python/pyspark/sql/connect/proto/base_pb2.pyi | 23 ++- python/pyspark/sql/connect/proto/base_pb2_grpc.py | 2 +- 18 files changed, 566 insertions(+), 159 deletions(-) diff --git a/common/utils/src/main/resources/error/error-classes.json b/common/utils/src/main/resources/error/error-classes.json index 75125d2275d..133c2dd826c 100644 --- a/common/utils/src/main/resources/error/error-classes.json +++ b/common/utils/src/main/resources/error/error-classes.json @@ -1552,6 +1552,11 @@ "Handle must be an UUID string of the format '00112233-4455-6677-8899-aabbccddeeff'" ] }, + "OPERATION_ABANDONED" : { + "message" : [ + "Operation was considered abandoned because of inactivity and removed." + ] + }, "OPERATION_ALREADY_EXISTS" : { "message" : [ "Operation already exists." diff --git a/connector/connect/common/src/main/protobuf/spark/connect/base.proto b/connector/connect/common/src/main/protobuf/spark/connect/base.proto index 79dbadba5bb..65e2493f836 100644 --- a/connector/connect/common/src/main/protobuf/spark/connect/base.proto +++ b/connector/connect/common/src/main/protobuf/spark/connect/base.proto @@ -772,8 +772,10 @@ message ReleaseExecuteResponse { // Session id in which the release was running. string session_id = 1; - // Operation id of the operation which the release concerns. - string operation_id = 2; + // Operation id of the operation on which the release executed. + // If the operation couldn't be found (because e.g. it was concurrently released), will be unset. + // Otherwise, it will be equal to the operation_id from request. + optional string operation_id = 2; } // Main interface for the SparkConnect service. @@ -809,7 +811,7 @@ service SparkConnectService { // Release an reattachable execution, or parts thereof. // The ExecutePlan must have been started with ReattachOptions.reattachable=true. // Non reattachable executions are released automatically and immediately after the ExecutePlan - // RPC and ReleaseExecute doesn't need to be used. + // RPC and ReleaseExecute may not be used. rpc ReleaseExecute(ReleaseExecuteRequest) returns (ReleaseExecuteResponse) {} } diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala index 0be53064cc0..9c03107db27 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala @@ -74,6 +74,30 @@ object Connect { .intConf .createWithDefault(1024) + val CONNECT_EXECUTE_MANAGER_DETACHED_TIMEOUT = + ConfigBuilder("spark.connect.execute.manager.detachedTimeout") + .internal() + .doc("Timeout after which executions without an attached RPC will be removed.") + .version("3.5.0") + .timeConf(TimeUnit.MILLISECONDS) + .createWithDefaultString("5m") + + val CONNECT_EXECUTE_MANAGER_MAINTENANCE_INTERVAL = + ConfigBuilder("spark.connect.execute.manager.maintenanceInterval") + .internal() + .doc("Interval at which execution manager will search for abandoned executions to remove.") + .version("3.5.0") + .timeConf(TimeUnit.MILLISECONDS) + .createWithDefaultString("30s") + + val CONNECT_EXECUTE_MANAGER_ABANDONED_TOMBSTONES_SIZE = + ConfigBuilder("spark.connect.execute.manager.abandonedTombstonesSize") + .internal() + .doc("Maximum size of the cache of abandoned executions.") + .version("3.5.0") + .intConf + .createWithDefaultString("10000") + val CONNECT_EXECUTE_REATTACHABLE_ENABLED = ConfigBuilder("spark.connect.execute.reattachable.enabled") .internal() diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala index 7b51a90ca37..6b8fcde1156 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala @@ -24,7 +24,8 @@ import org.apache.spark.{SparkEnv, SparkSQLException} import org.apache.spark.internal.Logging import org.apache.spark.sql.connect.common.ProtoUtils import org.apache.spark.sql.connect.config.Connect.{CONNECT_EXECUTE_REATTACHABLE_SENDER_MAX_STREAM_DURATION, CONNECT_EXECUTE_REATTACHABLE_SENDER_MAX_STREAM_SIZE} -import org.apache.spark.sql.connect.service.ExecuteHolder +import org.apache.spark.sql.connect.service.{ExecuteHolder, SparkConnectService} +import org.apache.spark.sql.connect.utils.ErrorUtils /** * ExecuteGrpcResponseSender sends responses to the GRPC stream. It consumes responses from @@ -37,12 +38,14 @@ import org.apache.spark.sql.connect.service.ExecuteHolder private[connect] class ExecuteGrpcResponseSender[T <: Message]( val executeHolder: ExecuteHolder, grpcObserver: StreamObserver[T]) - extends Logging { + extends Logging { self => + // the executionObserver object is used as a synchronization lock between the + // ExecuteGrpcResponseSender consumer and ExecuteResponseObserver producer. private var executionObserver = executeHolder.responseObserver .asInstanceOf[ExecuteResponseObserver[T]] - private var detached = false + private var interrupted = false // Signal to wake up when grpcCallObserver.isReady() private val grpcCallObserverReadySignal = new Object @@ -51,42 +54,50 @@ private[connect] class ExecuteGrpcResponseSender[T <: Message]( private var consumeSleep = 0L private var sendSleep = 0L + // Thread handling the processing, in case it's done in the background. + private var backgroundThread: Option[Thread] = None + /** - * Detach this sender from executionObserver. Called only from executionObserver that this - * sender is attached to. Lock on executionObserver is held, and notifyAll will wake up this - * sender if sleeping. + * Interrupt this sender and make it exit. */ - def detach(): Unit = executionObserver.synchronized { - if (detached == true) { - throw new IllegalStateException("ExecuteGrpcResponseSender already detached!") - } - detached = true + def interrupt(): Unit = executionObserver.synchronized { + interrupted = true executionObserver.notifyAll() } def run(lastConsumedStreamIndex: Long): Unit = { if (executeHolder.reattachable) { - // In reattachable execution, check if grpcObserver is ready for sending, by using - // setOnReadyHandler of the ServerCallStreamObserver. Otherwise, calling grpcObserver.onNext - // can queue the responses without sending them, and it is unknown how far behind it is, and - // hence how much the executionObserver needs to buffer. + // In reattachable execution we use setOnReadyHandler and grpcCallObserver.isReady to control + // backpressure. See sendResponse. // - // Because OnReady events get queued on the same GRPC inboud queue as the executePlan or - // reattachExecute RPC handler that this is executing in, OnReady events will not arrive and - // not trigger the OnReadyHandler unless this thread returns from executePlan/reattachExecute. + // Because calls to OnReadyHandler get queued on the same GRPC inboud queue as the executePlan + // or reattachExecute RPC handler that this is executing in, they will not arrive and not + // trigger the OnReadyHandler unless this thread returns from executePlan/reattachExecute. // Therefore, we launch another thread to operate on the grpcObserver and send the responses, // while this thread will exit from the executePlan/reattachExecute call, allowing GRPC // to send the OnReady events. // See https://github.com/grpc/grpc-java/issues/7361 - val t = new Thread( - s"SparkConnectGRPCSender_" + - s"opId=${executeHolder.operationId}_startIndex=$lastConsumedStreamIndex") { - override def run(): Unit = { - execute(lastConsumedStreamIndex) - } - } - executeHolder.grpcSenderThreads += t + backgroundThread = Some( + new Thread( + s"SparkConnectGRPCSender_" + + s"opId=${executeHolder.operationId}_startIndex=$lastConsumedStreamIndex") { + override def run(): Unit = { + try { + execute(lastConsumedStreamIndex) + } catch { + // This is executing in it's own thread, so need to handle RPC error like the + // SparkConnectService handlers do. + ErrorUtils.handleError( + "async-grpc-response-sender", + observer = grpcObserver, + userId = executeHolder.request.getUserContext.getUserId, + sessionId = executeHolder.request.getSessionId) + } finally { + executeHolder.removeGrpcResponseSender(self) + } + } + }) val grpcCallObserver = grpcObserver.asInstanceOf[ServerCallStreamObserver[T]] grpcCallObserver.setOnReadyHandler(() => { @@ -97,16 +108,17 @@ private[connect] class ExecuteGrpcResponseSender[T <: Message]( }) // Start the thread and exit - t.start() + backgroundThread.foreach(_.start()) } else { // Non reattachable execute runs directly in the GRPC thread. try { execute(lastConsumedStreamIndex) } finally { + executeHolder.removeGrpcResponseSender(this) if (!executeHolder.reattachable) { // Non reattachable executions release here immediately. // (Reattachable executions release with ReleaseExecute RPC.) - executeHolder.close() + SparkConnectService.executionManager.removeExecuteHolder(executeHolder.key) } } } @@ -159,9 +171,8 @@ private[connect] class ExecuteGrpcResponseSender[T <: Message]( while (!finished) { var response: Option[CachedStreamResponse[T]] = None - // Conditions for exiting the inner loop: - // 1. was detached from response observer - def detachedFromObserver = detached + // Conditions for exiting the inner loop (and helpers to compute them): + // 1. was interrupted // 2. has a response to send def gotResponse = response.nonEmpty // 3. sent everything from the stream and the stream is finished @@ -170,24 +181,21 @@ private[connect] class ExecuteGrpcResponseSender[T <: Message]( def deadlineLimitReached = sentResponsesSize > maximumResponseSize || deadlineTimeMillis < System.currentTimeMillis() - // Get next available response. - // Wait until either this sender got detached or next response is ready, - // or the stream is complete and it had already sent all responses. logTrace(s"Trying to get next response with index=$nextIndex.") executionObserver.synchronized { logTrace(s"Acquired executionObserver lock.") val sleepStart = System.nanoTime() var sleepEnd = 0L - while (!detachedFromObserver && + while (!interrupted && !gotResponse && !streamFinished && !deadlineLimitReached) { logTrace(s"Try to get response with index=$nextIndex from observer.") response = executionObserver.consumeResponse(nextIndex) logTrace(s"Response index=$nextIndex from observer: ${response.isDefined}") - // If response is empty, release executionObserver lock and wait to get notified. - // The state of detached, response and lastIndex are change under lock in - // executionObserver, and will notify upon state change. + // If response is empty, release executionObserver monitor and wait to get notified. + // The state of interrupted, response and lastIndex are changed under executionObserver + // monitor, and will notify upon state change. if (response.isEmpty) { val timeout = Math.max(1, deadlineTimeMillis - System.currentTimeMillis()) logTrace(s"Wait for response to become available with timeout=$timeout ms.") @@ -197,7 +205,7 @@ private[connect] class ExecuteGrpcResponseSender[T <: Message]( } } logTrace( - s"Exiting loop: detached=$detached, " + + s"Exiting loop: interrupted=$interrupted, " + s"response=${response.map(r => ProtoUtils.abbreviate(r.response))}, " + s"lastIndex=${executionObserver.getLastResponseIndex()}, " + s"deadline=${deadlineLimitReached}") @@ -208,10 +216,8 @@ private[connect] class ExecuteGrpcResponseSender[T <: Message]( } // Process the outcome of the inner loop. - if (detachedFromObserver) { - // This sender got detached by the observer. - // This only happens if this RPC is actually dead, and the client already came back with - // a ReattachExecute RPC. Kill this RPC. + if (interrupted) { + // This sender got interrupted. Kill this RPC. logWarning( s"Got detached from opId=${executeHolder.operationId} at index ${nextIndex - 1}." + s"totalTime=${System.nanoTime - startTime}ns " + @@ -256,12 +262,11 @@ private[connect] class ExecuteGrpcResponseSender[T <: Message]( } /** - * Send the response to the grpcCallObserver. In reattachable execution, we control the flow, - * and only pass the response to the grpcCallObserver when it's ready to send. Otherwise, - * grpcCallObserver.onNext() would return in a non-blocking way, but could queue responses - * without sending them if the client doesn't keep up receiving them. When pushing more - * responses to onNext(), there is no insight how far behind the service is in actually sending - * them out. + * Send the response to the grpcCallObserver. + * + * In reattachable execution, we control the backpressure and only send when the + * grpcCallObserver is in fact ready to send. + * * @param deadlineTimeMillis * when reattachable, wait for ready stream until this deadline. * @return @@ -278,12 +283,12 @@ private[connect] class ExecuteGrpcResponseSender[T <: Message]( grpcObserver.onNext(response.response) true } else { - // In reattachable execution, we control the flow, and only pass the response to the + // In reattachable execution, we control the backpressure, and only pass the response to the // grpcCallObserver when it's ready to send. // Otherwise, grpcCallObserver.onNext() would return in a non-blocking way, but could queue // responses without sending them if the client doesn't keep up receiving them. // When pushing more responses to onNext(), there is no insight how far behind the service is - // in actually sending them out. + // in actually sending them out. See https://github.com/grpc/grpc-java/issues/1549 // By sending responses only when grpcCallObserver.isReady(), we control that the actual // sending doesn't fall behind what we push from here. // By using the deadline, we exit the RPC if the responses aren't picked up by the client. @@ -295,7 +300,13 @@ private[connect] class ExecuteGrpcResponseSender[T <: Message]( logTrace(s"Acquired grpcCallObserverReadySignal lock.") val sleepStart = System.nanoTime() var sleepEnd = 0L - while (!grpcCallObserver.isReady() && deadlineTimeMillis >= System.currentTimeMillis()) { + // Conditions for exiting the inner loop + // 1. was detached + // 2. grpcCallObserver is ready to send more data + // 3. time deadline is reached + while (!interrupted && + !grpcCallObserver.isReady() && + deadlineTimeMillis >= System.currentTimeMillis()) { val timeout = Math.max(1, deadlineTimeMillis - System.currentTimeMillis()) var sleepStart = System.nanoTime() logTrace(s"Wait for grpcCallObserver to become ready with timeout=$timeout ms.") @@ -303,7 +314,7 @@ private[connect] class ExecuteGrpcResponseSender[T <: Message]( logTrace(s"Reacquired grpcCallObserverReadySignal lock after waiting.") sleepEnd = System.nanoTime() } - if (grpcCallObserver.isReady()) { + if (!interrupted && grpcCallObserver.isReady()) { val sleepTime = if (sleepEnd > 0L) sleepEnd - sleepStart else 0L logDebug( s"SEND opId=${executeHolder.operationId} responseId=${response.responseId} " + @@ -313,7 +324,7 @@ private[connect] class ExecuteGrpcResponseSender[T <: Message]( grpcCallObserver.onNext(response.response) true } else { - logTrace(s"grpcCallObserver is not ready, exiting.") + logTrace(s"exiting sendResponse without sending") false } } diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteResponseObserver.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteResponseObserver.scala index 0573f7b3dae..d9db07fd228 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteResponseObserver.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteResponseObserver.scala @@ -148,8 +148,8 @@ private[connect] class ExecuteResponseObserver[T <: Message](val executeHolder: /** Attach a new consumer (ExecuteResponseGRPCSender). */ def attachConsumer(newSender: ExecuteGrpcResponseSender[T]): Unit = synchronized { - // detach the current sender before attaching new one - responseSender.foreach(_.detach()) + // interrupt the current sender before attaching new one + responseSender.foreach(_.interrupt()) responseSender = Some(newSender) } @@ -241,11 +241,6 @@ private[connect] class ExecuteResponseObserver[T <: Message](val executeHolder: finalProducedIndex.isDefined } - /** Consumer (ExecuteResponseGRPCSender) waits on the monitor of ExecuteResponseObserver. */ - private def notifyConsumer(): Unit = { - notifyAll() - } - /** * Remove cached responses after response with lastReturnedIndex is returned from getResponse. * Remove according to caching policy: diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteHolder.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteHolder.scala index 105af0dc0ba..bce07133392 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteHolder.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteHolder.scala @@ -84,19 +84,6 @@ private[connect] class ExecuteHolder( } } - /** - * True if there is currently an RPC (ExecutePlanRequest, ReattachExecute) attached to this - * execution. - */ - var attached: Boolean = true - - /** - * Threads that execute the ExecuteGrpcResponseSender and send the GRPC responses. - * - * TODO(SPARK-44625): Joining and cleaning up these threads during cleanup. - */ - val grpcSenderThreads: mutable.ArrayBuffer[Thread] = new mutable.ArrayBuffer[Thread]() - val responseObserver: ExecuteResponseObserver[proto.ExecutePlanResponse] = new ExecuteResponseObserver[proto.ExecutePlanResponse](this) @@ -104,12 +91,35 @@ private[connect] class ExecuteHolder( private val runner: ExecuteThreadRunner = new ExecuteThreadRunner(this) + /** System.currentTimeMillis when this ExecuteHolder was created. */ + val creationTime = System.currentTimeMillis() + + /** + * None if there is currently an attached RPC (grpcResponseSenders not empty or during initial + * ExecutePlan handler). Otherwise, the System.currentTimeMillis when the last RPC detached + * (grpcResponseSenders became empty). + */ + @volatile var lastAttachedRpcTime: Option[Long] = None + + /** System.currentTimeMillis when this ExecuteHolder was closed. */ + private var closedTime: Option[Long] = None + + /** + * Attached ExecuteGrpcResponseSenders that send the GRPC responses. + * + * In most situations at most one, except network hang issues where temporarily there would be a + * stale one, before being interrupted by a new one in ReattachExecute. + */ + private val grpcResponseSenders + : mutable.ArrayBuffer[ExecuteGrpcResponseSender[proto.ExecutePlanResponse]] = + new mutable.ArrayBuffer[ExecuteGrpcResponseSender[proto.ExecutePlanResponse]]() + /** * Start the execution. The execution is started in a background thread in ExecuteThreadRunner. * Responses are produced and cached in ExecuteResponseObserver. A GRPC thread consumes the * responses by attaching an ExecuteGrpcResponseSender, * @see - * attachAndRunGrpcResponseSender. + * runGrpcResponseSender. */ def start(): Unit = { runner.start() @@ -128,8 +138,9 @@ private[connect] class ExecuteHolder( * @param responseSender * the ExecuteGrpcResponseSender */ - def attachAndRunGrpcResponseSender( + def runGrpcResponseSender( responseSender: ExecuteGrpcResponseSender[proto.ExecutePlanResponse]): Unit = { + addGrpcResponseSender(responseSender) responseSender.run(0) } @@ -142,13 +153,49 @@ private[connect] class ExecuteHolder( * the last response that was already consumed. The sender will start from response after * that. */ - def attachAndRunGrpcResponseSender( + def runGrpcResponseSender( responseSender: ExecuteGrpcResponseSender[proto.ExecutePlanResponse], lastConsumedResponseId: String): Unit = { val lastConsumedIndex = responseObserver.getResponseIndexById(lastConsumedResponseId) + addGrpcResponseSender(responseSender) responseSender.run(lastConsumedIndex) } + private def addGrpcResponseSender( + sender: ExecuteGrpcResponseSender[proto.ExecutePlanResponse]) = synchronized { + if (closedTime.isEmpty) { + grpcResponseSenders += sender + lastAttachedRpcTime = None + } else { + // execution is closing... interrupt it already. + sender.interrupt() + } + } + + def removeGrpcResponseSender[_](sender: ExecuteGrpcResponseSender[_]): Unit = synchronized { + // if closed, we are shutting down and interrupting all senders already + if (closedTime.isEmpty) { + grpcResponseSenders -= + sender.asInstanceOf[ExecuteGrpcResponseSender[proto.ExecutePlanResponse]] + if (grpcResponseSenders.isEmpty) { + lastAttachedRpcTime = Some(System.currentTimeMillis()) + } + } + } + + /** + * For a short period in ExecutePlan after creation and until runGrpcResponseSender is called, + * there is no attached response sender, but yet we start with lastAttachedRpcTime = None, so we + * don't get garbage collected. End this grace period when the initial ExecutePlan ends. + */ + def afterInitialRPC(): Unit = synchronized { + if (closedTime.isEmpty) { + if (grpcResponseSenders.isEmpty) { + lastAttachedRpcTime = Some(System.currentTimeMillis()) + } + } + } + /** * Remove cached responses from the response observer until and including the response with * given responseId. @@ -168,15 +215,30 @@ private[connect] class ExecuteHolder( } /** - * Close the execution and remove it from the session. Note: it first interrupts the runner if - * it's still running, and it waits for it to finish. + * Interrupt (if still running) and close the execution. + * + * Called only by SparkConnectExecutionManager.removeExecuteHolder, which then also removes the + * execution from global tracking and from its session. */ - def close(): Unit = { - runner.interrupt() - runner.join() - responseObserver.removeAll() - eventsManager.postClosed() - sessionHolder.removeExecuteHolder(operationId) + def close(): Unit = synchronized { + if (closedTime.isEmpty) { + // interrupt execution, if still running. + runner.interrupt() + // wait for execution to finish, to make sure no more results get pushed to responseObserver + runner.join() + // interrupt any attached grpcResponseSenders + grpcResponseSenders.foreach(_.interrupt()) + // if there were still any grpcResponseSenders, register detach time + if (grpcResponseSenders.nonEmpty) { + lastAttachedRpcTime = Some(System.currentTimeMillis()) + grpcResponseSenders.clear() + } + // remove all cached responses from observer + responseObserver.removeAll() + // post closed to UI + eventsManager.postClosed() + closedTime = Some(System.currentTimeMillis()) + } } /** @@ -187,6 +249,25 @@ private[connect] class ExecuteHolder( "SparkConnect_Execute_" + s"User_${sessionHolder.userId}_Session_${sessionHolder.sessionId}_Tag_${tag}" } + + /** Get ExecuteInfo with information about this ExecuteHolder. */ + def getExecuteInfo: ExecuteInfo = synchronized { + ExecuteInfo( + request = request, + userId = sessionHolder.userId, + sessionId = sessionHolder.sessionId, + operationId = operationId, + jobTag = jobTag, + sparkSessionTags = sparkSessionTags, + reattachable = reattachable, + status = eventsManager.status, + creationTime = creationTime, + lastAttachedRpcTime = lastAttachedRpcTime, + closedTime = closedTime) + } + + /** Get key used by SparkConnectExecutionManager global tracker. */ + def key: ExecuteKey = ExecuteKey(sessionHolder.userId, sessionHolder.sessionId, operationId) } /** Used to identify ExecuteHolder jobTag among SparkContext.SPARK_JOB_TAGS. */ @@ -221,3 +302,17 @@ object ExecuteSessionTag { if (sessionTag.startsWith(prefix)) Some(sessionTag) else None } } + +/** Information about an ExecuteHolder. */ +case class ExecuteInfo( + request: proto.ExecutePlanRequest, + userId: String, + sessionId: String, + operationId: String, + jobTag: String, + sparkSessionTags: Set[String], + reattachable: Boolean, + status: ExecuteStatus, + creationTime: Long, + lastAttachedRpcTime: Option[Long], + closedTime: Option[Long]) diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala index 29134f0dc0d..b828d78710f 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala @@ -24,8 +24,7 @@ import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap} import scala.collection.JavaConverters._ import scala.collection.mutable -import org.apache.spark.{JobArtifactSet, SparkException, SparkSQLException} -import org.apache.spark.connect.proto +import org.apache.spark.{JobArtifactSet, SparkException} import org.apache.spark.internal.Logging import org.apache.spark.sql.DataFrame import org.apache.spark.sql.SparkSession @@ -42,7 +41,7 @@ import org.apache.spark.util.Utils case class SessionHolder(userId: String, sessionId: String, session: SparkSession) extends Logging { - val executions: ConcurrentMap[String, ExecuteHolder] = + private val executions: ConcurrentMap[String, ExecuteHolder] = new ConcurrentHashMap[String, ExecuteHolder]() val eventManager: SessionEventsManager = SessionEventsManager(this, new SystemClock()) @@ -56,23 +55,23 @@ case class SessionHolder(userId: String, sessionId: String, session: SparkSessio private lazy val listenerCache: ConcurrentMap[String, StreamingQueryListener] = new ConcurrentHashMap() - private[connect] def createExecuteHolder(request: proto.ExecutePlanRequest): ExecuteHolder = { - val executeHolder = new ExecuteHolder(request, this) + /** Add ExecuteHolder to this session. Called only by SparkConnectExecutionManager. */ + private[service] def addExecuteHolder(executeHolder: ExecuteHolder): Unit = { val oldExecute = executions.putIfAbsent(executeHolder.operationId, executeHolder) if (oldExecute != null) { - throw new SparkSQLException( - errorClass = "INVALID_HANDLE.OPERATION_ALREADY_EXISTS", - messageParameters = Map("handle" -> executeHolder.operationId)) + // the existance of this should alrady be checked by SparkConnectExecutionManager + throw new IllegalStateException( + s"ExecuteHolder with opId=${executeHolder.operationId} already exists!") } - executeHolder } - private[connect] def executeHolder(operationId: String): Option[ExecuteHolder] = { - Option(executions.get(operationId)) + /** Remove ExecuteHolder to this session. Called only by SparkConnectExecutionManager. */ + private[service] def removeExecuteHolder(operationId: String): Unit = { + executions.remove(operationId) } - private[connect] def removeExecuteHolder(operationId: String): Unit = { - executions.remove(operationId) + private[connect] def executeHolder(operationId: String): Option[ExecuteHolder] = { + Option(executions.get(operationId)) } /** @@ -80,7 +79,7 @@ case class SessionHolder(userId: String, sessionId: String, session: SparkSessio * @return * list of operationIds of interrupted executions */ - private[connect] def interruptAll(): Seq[String] = { + private[service] def interruptAll(): Seq[String] = { val interruptedIds = new mutable.ArrayBuffer[String]() executions.asScala.values.foreach { execute => if (execute.interrupt()) { @@ -95,7 +94,7 @@ case class SessionHolder(userId: String, sessionId: String, session: SparkSessio * @return * list of operationIds of interrupted executions */ - private[connect] def interruptTag(tag: String): Seq[String] = { + private[service] def interruptTag(tag: String): Seq[String] = { val interruptedIds = new mutable.ArrayBuffer[String]() executions.asScala.values.foreach { execute => if (execute.sparkSessionTags.contains(tag)) { @@ -112,7 +111,7 @@ case class SessionHolder(userId: String, sessionId: String, session: SparkSessio * @return * list of operationIds of interrupted executions (one element or empty) */ - private[connect] def interruptOperation(operationId: String): Seq[String] = { + private[service] def interruptOperation(operationId: String): Seq[String] = { val interruptedIds = new mutable.ArrayBuffer[String]() Option(executions.get(operationId)).foreach { execute => if (execute.interrupt()) { @@ -252,6 +251,12 @@ object SessionHolder { /** Creates a dummy session holder for use in tests. */ def forTesting(session: SparkSession): SessionHolder = { - SessionHolder(userId = "testUser", sessionId = UUID.randomUUID().toString, session = session) + val ret = + SessionHolder( + userId = "testUser", + sessionId = UUID.randomUUID().toString, + session = session) + SparkConnectService.putSessionForTesting(ret) + ret } } diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutePlanHandler.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutePlanHandler.scala index 9daf1e17b5e..1ab5f26f90b 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutePlanHandler.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutePlanHandler.scala @@ -27,14 +27,15 @@ class SparkConnectExecutePlanHandler(responseObserver: StreamObserver[proto.Exec extends Logging { def handle(v: proto.ExecutePlanRequest): Unit = { - val sessionHolder = SparkConnectService - .getOrCreateIsolatedSession(v.getUserContext.getUserId, v.getSessionId) - val executeHolder = sessionHolder.createExecuteHolder(v) - - executeHolder.eventsManager.postStarted() - executeHolder.start() - val responseSender = - new ExecuteGrpcResponseSender[proto.ExecutePlanResponse](executeHolder, responseObserver) - executeHolder.attachAndRunGrpcResponseSender(responseSender) + val executeHolder = SparkConnectService.executionManager.createExecuteHolder(v) + try { + executeHolder.eventsManager.postStarted() + executeHolder.start() + val responseSender = + new ExecuteGrpcResponseSender[proto.ExecutePlanResponse](executeHolder, responseObserver) + executeHolder.runGrpcResponseSender(responseSender) + } finally { + executeHolder.afterInitialRPC() + } } } diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala new file mode 100644 index 00000000000..ce1f6c93f6c --- /dev/null +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connect.service + +import java.util.concurrent.{Executors, ScheduledExecutorService, TimeUnit} + +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.util.control.NonFatal + +import com.google.common.cache.CacheBuilder + +import org.apache.spark.{SparkEnv, SparkSQLException} +import org.apache.spark.connect.proto +import org.apache.spark.internal.Logging +import org.apache.spark.sql.connect.config.Connect.{CONNECT_EXECUTE_MANAGER_ABANDONED_TOMBSTONES_SIZE, CONNECT_EXECUTE_MANAGER_DETACHED_TIMEOUT, CONNECT_EXECUTE_MANAGER_MAINTENANCE_INTERVAL} + +// Unique key identifying execution by combination of user, session and operation id +case class ExecuteKey(userId: String, sessionId: String, operationId: String) + +/** + * Global tracker of all ExecuteHolder executions. + * + * All ExecuteHolders are created, and removed through it. It keeps track of all the executions, + * and removes executions that have been abandoned. + */ +private[connect] class SparkConnectExecutionManager() extends Logging { + + /** Hash table containing all current executions. Guarded by executionsLock. */ + private val executions: mutable.HashMap[ExecuteKey, ExecuteHolder] = + new mutable.HashMap[ExecuteKey, ExecuteHolder]() + private val executionsLock = new Object + + /** Graveyard of tombstones of executions that were abandoned and removed. */ + private val abandonedTombstones = CacheBuilder + .newBuilder() + .maximumSize(SparkEnv.get.conf.get(CONNECT_EXECUTE_MANAGER_ABANDONED_TOMBSTONES_SIZE)) + .build[ExecuteKey, ExecuteInfo]() + + /** None if there are no executions. Otherwise, the time when the last execution was removed. */ + private var lastExecutionTime: Option[Long] = Some(System.currentTimeMillis()) + + /** Executor for the periodic maintenance */ + private var scheduledExecutor: Option[ScheduledExecutorService] = None + + /** + * Create a new ExecuteHolder and register it with this global manager and with its session. + */ + private[connect] def createExecuteHolder(request: proto.ExecutePlanRequest): ExecuteHolder = { + val sessionHolder = SparkConnectService + .getOrCreateIsolatedSession(request.getUserContext.getUserId, request.getSessionId) + val executeHolder = new ExecuteHolder(request, sessionHolder) + executionsLock.synchronized { + // Check if the operation already exists, both in active executions, and in the graveyard + // of tombstones of executions that have been abandoned. + // The latter is to prevent double execution when a client retries execution, thinking it + // never reached the server, but in fact it did, and already got removed as abandoned. + if (executions.get(executeHolder.key).isDefined) { + if (getAbandonedTombstone(executeHolder.key).isDefined) { + throw new SparkSQLException( + errorClass = "INVALID_HANDLE.OPERATION_ABANDONED", + messageParameters = Map("handle" -> executeHolder.operationId)) + } else { + throw new SparkSQLException( + errorClass = "INVALID_HANDLE.OPERATION_ALREADY_EXISTS", + messageParameters = Map("handle" -> executeHolder.operationId)) + } + } + sessionHolder.addExecuteHolder(executeHolder) + executions.put(executeHolder.key, executeHolder) + lastExecutionTime = None + logInfo(s"ExecuteHolder ${executeHolder.key} is created.") + } + + schedulePeriodicChecks() // Starts the maintenance thread if it hasn't started. + + executeHolder + } + + /** + * Remove an ExecuteHolder from this global manager and from its session. Interrupt the + * execution if still running, free all resources. + */ + private[connect] def removeExecuteHolder(key: ExecuteKey): Unit = { + var executeHolder: Option[ExecuteHolder] = None + executionsLock.synchronized { + executeHolder = executions.remove(key) + executeHolder.foreach(e => e.sessionHolder.removeExecuteHolder(e.operationId)) + if (executions.isEmpty) { + lastExecutionTime = Some(System.currentTimeMillis()) + } + logInfo(s"ExecuteHolder $key is removed.") + } + // close the execution outside the lock + executeHolder.foreach(_.close()) + } + + private[connect] def getExecuteHolder(key: ExecuteKey): Option[ExecuteHolder] = { + executionsLock.synchronized { + executions.get(key) + } + } + + /** Get info about abandoned execution, if there is one. */ + private[connect] def getAbandonedTombstone(key: ExecuteKey): Option[ExecuteInfo] = { + Option(abandonedTombstones.getIfPresent(key)) + } + + /** + * If there are no executions, return Left with System.currentTimeMillis of last active + * execution. Otherwise return Right with list of ExecuteInfo of all executions. + */ + def listActiveExecutions: Either[Long, Seq[ExecuteInfo]] = executionsLock.synchronized { + if (executions.isEmpty) { + Left(lastExecutionTime.get) + } else { + Right(executions.values.map(_.getExecuteInfo).toBuffer.toSeq) + } + } + + /** + * Return list of executions that got abandoned and removed by periodic maintenance. This is a + * cache, and the tombstones will be eventually removed. + */ + def listAbandonedExecutions: Seq[ExecuteInfo] = { + abandonedTombstones.asMap.asScala.values.toBuffer.toSeq + } + + private[service] def shutdown(): Unit = executionsLock.synchronized { + scheduledExecutor.foreach { executor => + executor.shutdown() + executor.awaitTermination(1, TimeUnit.MINUTES) + } + scheduledExecutor = None + } + + /** + * Schedules periodic maintenance checks if it is not already scheduled. The checks are looking + * for executions that have not been closed, but are left with no RPC attached to them, and + * removes them after a timeout. + */ + private def schedulePeriodicChecks(): Unit = executionsLock.synchronized { + val interval = SparkEnv.get.conf.get(CONNECT_EXECUTE_MANAGER_MAINTENANCE_INTERVAL).toLong + val timeout = SparkEnv.get.conf.get(CONNECT_EXECUTE_MANAGER_DETACHED_TIMEOUT).toLong + + scheduledExecutor match { + case Some(_) => // Already running. + case None => + logInfo(s"Starting thread for cleanup of abandoned executions every $interval ms") + scheduledExecutor = Some(Executors.newSingleThreadScheduledExecutor()) + scheduledExecutor.get.scheduleAtFixedRate( + () => { + try periodicMaintenance(timeout) + catch { + case NonFatal(ex) => logWarning("Unexpected exception in periodic task", ex) + } + }, + interval, + interval, + TimeUnit.MILLISECONDS) + } + } + + // Visible for testing. + private[connect] def periodicMaintenance(timeout: Long): Unit = { + logInfo("Started periodic run of SparkConnectExecutionManager maintenance.") + + // Find any detached executions that expired and should be removed. + val toRemove = new mutable.ArrayBuffer[ExecuteHolder]() + executionsLock.synchronized { + val nowMs = System.currentTimeMillis() + + executions.values.foreach { executeHolder => + executeHolder.lastAttachedRpcTime match { + case Some(detached) => + if (detached + timeout < nowMs) { + toRemove += executeHolder + } + case _ => // execution is active + } + } + } + if (!toRemove.isEmpty) { + // .. and remove them. + toRemove.foreach { executeHolder => + val info = executeHolder.getExecuteInfo + logInfo(s"Found execution $info that was abandoned and expired and will be removed.") + removeExecuteHolder(executeHolder.key) + abandonedTombstones.put(executeHolder.key, info) + } + } + logInfo("Finished periodic run of SparkConnectExecutionManager maintenance.") + } +} diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectReattachExecuteHandler.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectReattachExecuteHandler.scala index b70c82ab137..393b832de87 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectReattachExecuteHandler.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectReattachExecuteHandler.scala @@ -29,14 +29,25 @@ class SparkConnectReattachExecuteHandler( extends Logging { def handle(v: proto.ReattachExecuteRequest): Unit = { - val sessionHolder = SparkConnectService - .getIsolatedSession(v.getUserContext.getUserId, v.getSessionId) - val executeHolder = sessionHolder.executeHolder(v.getOperationId).getOrElse { - logDebug(s"Reattach operation not found: ${v.getOperationId}") - throw new SparkSQLException( - errorClass = "INVALID_HANDLE.OPERATION_NOT_FOUND", - messageParameters = Map("handle" -> v.getOperationId)) - } + val executeHolder = SparkConnectService.executionManager + .getExecuteHolder(ExecuteKey(v.getUserContext.getUserId, v.getSessionId, v.getOperationId)) + .getOrElse { + if (SparkConnectService.executionManager + .getAbandonedTombstone( + ExecuteKey(v.getUserContext.getUserId, v.getSessionId, v.getOperationId)) + .isDefined) { + logDebug(s"Reattach operation abandoned: ${v.getOperationId}") + throw new SparkSQLException( + errorClass = "INVALID_HANDLE.OPERATION_ABANDONED", + messageParameters = Map("handle" -> v.getOperationId)) + + } else { + logDebug(s"Reattach operation not found: ${v.getOperationId}") + throw new SparkSQLException( + errorClass = "INVALID_HANDLE.OPERATION_NOT_FOUND", + messageParameters = Map("handle" -> v.getOperationId)) + } + } if (!executeHolder.reattachable) { logWarning(s"Reattach to not reattachable operation.") throw new SparkSQLException( @@ -48,10 +59,10 @@ class SparkConnectReattachExecuteHandler( new ExecuteGrpcResponseSender[proto.ExecutePlanResponse](executeHolder, responseObserver) if (v.hasLastResponseId) { // start from response after lastResponseId - executeHolder.attachAndRunGrpcResponseSender(responseSender, v.getLastResponseId) + executeHolder.runGrpcResponseSender(responseSender, v.getLastResponseId) } else { // start from the start of the stream. - executeHolder.attachAndRunGrpcResponseSender(responseSender) + executeHolder.runGrpcResponseSender(responseSender) } } } diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectReleaseExecuteHandler.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectReleaseExecuteHandler.scala index 244aafb81ab..a3a7815609e 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectReleaseExecuteHandler.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectReleaseExecuteHandler.scala @@ -30,33 +30,36 @@ class SparkConnectReleaseExecuteHandler( def handle(v: proto.ReleaseExecuteRequest): Unit = { val sessionHolder = SparkConnectService .getIsolatedSession(v.getUserContext.getUserId, v.getSessionId) - val executeHolder = sessionHolder.executeHolder(v.getOperationId).getOrElse { - throw new SparkSQLException( - errorClass = "INVALID_HANDLE.OPERATION_NOT_FOUND", - messageParameters = Map("handle" -> v.getOperationId)) - } - if (!executeHolder.reattachable) { - throw new SparkSQLException( - errorClass = "INVALID_CURSOR.NOT_REATTACHABLE", - messageParameters = Map.empty) - } - - if (v.hasReleaseAll) { - executeHolder.close() - } else if (v.hasReleaseUntil) { - val responseId = v.getReleaseUntil.getResponseId - executeHolder.releaseUntilResponseId(responseId) - } else { - throw new UnsupportedOperationException(s"Unknown ReleaseExecute type!") - } - - val response = proto.ReleaseExecuteResponse + + val responseBuilder = proto.ReleaseExecuteResponse .newBuilder() .setSessionId(v.getSessionId) - .setOperationId(v.getOperationId) - .build() - responseObserver.onNext(response) + // ExecuteHolder may be concurrently released by SparkConnectExecutionManager if the + // ReleaseExecute arrived after it was abandoned and timed out. + // An asynchronous ReleastUntil operation may also arrive after ReleaseAll. + // Because of that, make it noop and not fail if the ExecuteHolder is no longer there. + val executeHolderOption = + sessionHolder.executeHolder(v.getOperationId).foreach { executeHolder => + if (!executeHolder.reattachable) { + throw new SparkSQLException( + errorClass = "INVALID_CURSOR.NOT_REATTACHABLE", + messageParameters = Map.empty) + } + + responseBuilder.setOperationId(executeHolder.operationId) + + if (v.hasReleaseAll) { + SparkConnectService.executionManager.removeExecuteHolder(executeHolder.key) + } else if (v.hasReleaseUntil) { + val responseId = v.getReleaseUntil.getResponseId + executeHolder.releaseUntilResponseId(responseId) + } else { + throw new UnsupportedOperationException(s"Unknown ReleaseExecute type!") + } + } + + responseObserver.onNext(responseBuilder.build()) responseObserver.onCompleted() } } diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala index 99e4e4d56ae..a7b5fbdcec0 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala @@ -278,6 +278,8 @@ object SparkConnectService extends Logging { private val userSessionMapping = cacheBuilder(CACHE_SIZE, CACHE_TIMEOUT_SECONDS).build[SessionCacheKey, SessionHolder]() + private[connect] val executionManager = new SparkConnectExecutionManager() + private[connect] val streamingSessionManager = new SparkConnectStreamingQueryCache() @@ -353,6 +355,13 @@ object SparkConnectService extends Logging { userSessionMapping.invalidateAll() } + /** + * Used for testing. + */ + private[connect] def putSessionForTesting(sessionHolder: SessionHolder): Unit = { + userSessionMapping.put((sessionHolder.userId, sessionHolder.sessionId), sessionHolder) + } + private def newIsolatedSession(): SparkSession = { SparkSession.active.newSession() } @@ -414,6 +423,8 @@ object SparkConnectService extends Logging { server.shutdownNow() } } + streamingSessionManager.shutdown() + executionManager.shutdown() userSessionMapping.invalidateAll() uiTab.foreach(_.detach()) } diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/utils/ErrorUtils.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/utils/ErrorUtils.scala index 04f3e15d38e..2050ebc01aa 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/utils/ErrorUtils.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/utils/ErrorUtils.scala @@ -133,10 +133,23 @@ private[connect] object ErrorUtils extends Logging { } partial .andThen { case (original, wrapped) => + if (events.isDefined) { + // Errors thrown inside execution are user query errors, return then as INFO. + logInfo( + s"Spark Connect error " + + s"during: $opType. UserId: $userId. SessionId: $sessionId.", + original) + } else { + // Other errors are server RPC errors, return them as ERROR. + logError( + s"Spark Connect RPC error " + + s"during: $opType. UserId: $userId. SessionId: $sessionId.", + original) + } + // If ExecuteEventsManager is present, this this is an execution error that needs to be // posted to it. events.foreach { executeEventsManager => - logError(s"Error during: $opType. UserId: $userId. SessionId: $sessionId.", original) if (isInterrupted) { executeEventsManager.postCanceled() } else { diff --git a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectPlannerSuite.scala b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectPlannerSuite.scala index 40d83b07b75..39b4f40215d 100644 --- a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectPlannerSuite.scala +++ b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectPlannerSuite.scala @@ -35,7 +35,7 @@ import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.types.DataTypeUtils import org.apache.spark.sql.connect.common.InvalidPlanInput import org.apache.spark.sql.connect.common.LiteralValueProtoConverter.toLiteralProto -import org.apache.spark.sql.connect.service.{ExecuteHolder, ExecuteStatus, SessionHolder, SessionStatus} +import org.apache.spark.sql.connect.service.{ExecuteHolder, ExecuteStatus, SessionHolder, SessionStatus, SparkConnectService} import org.apache.spark.sql.execution.arrow.ArrowConverters import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} @@ -131,9 +131,10 @@ trait SparkConnectPlanTest extends SharedSparkSession { val request = proto.ExecutePlanRequest .newBuilder() .setPlan(plan) + .setSessionId(sessionHolder.sessionId) .setUserContext(context) .build() - val executeHolder = sessionHolder.createExecuteHolder(request) + val executeHolder = SparkConnectService.executionManager.createExecuteHolder(request) executeHolder.eventsManager.status_(ExecuteStatus.Started) executeHolder } diff --git a/docs/sql-error-conditions-invalid-handle-error-class.md b/docs/sql-error-conditions-invalid-handle-error-class.md index f8d3eab4d1c..c4cbb48035f 100644 --- a/docs/sql-error-conditions-invalid-handle-error-class.md +++ b/docs/sql-error-conditions-invalid-handle-error-class.md @@ -29,6 +29,10 @@ This error class has the following derived error classes: Handle must be an UUID string of the format '00112233-4455-6677-8899-aabbccddeeff' +## OPERATION_ABANDONED + +Operation was considered abandoned because of inactivity and removed. + ## OPERATION_ALREADY_EXISTS Operation already exists. diff --git a/python/pyspark/sql/connect/proto/base_pb2.py b/python/pyspark/sql/connect/proto/base_pb2.py index 70363922d8e..fa1868b489c 100644 --- a/python/pyspark/sql/connect/proto/base_pb2.py +++ b/python/pyspark/sql/connect/proto/base_pb2.py @@ -37,7 +37,7 @@ from pyspark.sql.connect.proto import types_pb2 as spark_dot_connect_dot_types__ DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile( - b'\n\x18spark/connect/base.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1cspark/connect/commands.proto\x1a\x1aspark/connect/common.proto\x1a\x1fspark/connect/expressions.proto\x1a\x1dspark/connect/relations.proto\x1a\x19spark/connect/types.proto"t\n\x04Plan\x12-\n\x04root\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationH\x00R\x04root\x12\x32\n\x07\x63ommand\x18\x02 \x01(\x0b\x32\x16.spark.connect.CommandH\x00R\x07\x63ommandB\t\n\x07op_type"z\n\x0bUserContext\x12\x17 [...] + b'\n\x18spark/connect/base.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1cspark/connect/commands.proto\x1a\x1aspark/connect/common.proto\x1a\x1fspark/connect/expressions.proto\x1a\x1dspark/connect/relations.proto\x1a\x19spark/connect/types.proto"t\n\x04Plan\x12-\n\x04root\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationH\x00R\x04root\x12\x32\n\x07\x63ommand\x18\x02 \x01(\x0b\x32\x16.spark.connect.CommandH\x00R\x07\x63ommandB\t\n\x07op_type"z\n\x0bUserContext\x12\x17 [...] ) _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals()) @@ -197,7 +197,7 @@ if _descriptor._USE_C_DESCRIPTORS == False: _RELEASEEXECUTEREQUEST_RELEASEUNTIL._serialized_start = 11110 _RELEASEEXECUTEREQUEST_RELEASEUNTIL._serialized_end = 11157 _RELEASEEXECUTERESPONSE._serialized_start = 11186 - _RELEASEEXECUTERESPONSE._serialized_end = 11276 - _SPARKCONNECTSERVICE._serialized_start = 11279 - _SPARKCONNECTSERVICE._serialized_end = 12022 + _RELEASEEXECUTERESPONSE._serialized_end = 11298 + _SPARKCONNECTSERVICE._serialized_start = 11301 + _SPARKCONNECTSERVICE._serialized_end = 12044 # @@protoc_insertion_point(module_scope) diff --git a/python/pyspark/sql/connect/proto/base_pb2.pyi b/python/pyspark/sql/connect/proto/base_pb2.pyi index a886ecbd618..8fd5fa7a056 100644 --- a/python/pyspark/sql/connect/proto/base_pb2.pyi +++ b/python/pyspark/sql/connect/proto/base_pb2.pyi @@ -2684,18 +2684,35 @@ class ReleaseExecuteResponse(google.protobuf.message.Message): session_id: builtins.str """Session id in which the release was running.""" operation_id: builtins.str - """Operation id of the operation which the release concerns.""" + """Operation id of the operation on which the release executed. + If the operation couldn't be found (because e.g. it was concurrently released), will be unset. + Otherwise, it will be equal to the operation_id from request. + """ def __init__( self, *, session_id: builtins.str = ..., - operation_id: builtins.str = ..., + operation_id: builtins.str | None = ..., ) -> None: ... + def HasField( + self, + field_name: typing_extensions.Literal[ + "_operation_id", b"_operation_id", "operation_id", b"operation_id" + ], + ) -> builtins.bool: ... def ClearField( self, field_name: typing_extensions.Literal[ - "operation_id", b"operation_id", "session_id", b"session_id" + "_operation_id", + b"_operation_id", + "operation_id", + b"operation_id", + "session_id", + b"session_id", ], ) -> None: ... + def WhichOneof( + self, oneof_group: typing_extensions.Literal["_operation_id", b"_operation_id"] + ) -> typing_extensions.Literal["operation_id"] | None: ... global___ReleaseExecuteResponse = ReleaseExecuteResponse diff --git a/python/pyspark/sql/connect/proto/base_pb2_grpc.py b/python/pyspark/sql/connect/proto/base_pb2_grpc.py index 2702ec71ff5..e6bfda8a40a 100644 --- a/python/pyspark/sql/connect/proto/base_pb2_grpc.py +++ b/python/pyspark/sql/connect/proto/base_pb2_grpc.py @@ -130,7 +130,7 @@ class SparkConnectServiceServicer(object): """Release an reattachable execution, or parts thereof. The ExecutePlan must have been started with ReattachOptions.reattachable=true. Non reattachable executions are released automatically and immediately after the ExecutePlan - RPC and ReleaseExecute doesn't need to be used. + RPC and ReleaseExecute may not be used. """ context.set_code(grpc.StatusCode.UNIMPLEMENTED) context.set_details("Method not implemented!") --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org