juliuszsompolski commented on code in PR #44189:
URL: https://github.com/apache/spark/pull/44189#discussion_r1415893765
##########
connector/connect/server/src/test/scala/org/apache/spark/sql/connect/execution/ReattachableExecuteSuite.scala:
##########
@@ -309,12 +318,21 @@ class ReattachableExecuteSuite extends
SparkConnectServerTest {
.setOperationId(operationId)
.build()
val iter = stub.executePlan(
- buildExecutePlanRequest(buildPlan(MEDIUM_RESULTS_QUERY), operationId =
operationId))
+ buildExecutePlanRequest(buildPlan("select sleep(30000) as s"),
operationId = operationId))
// wait for execute holder to exist, but the execute thread may not have
started yet.
Eventually.eventually(timeout(eventuallyTimeout)) {
assert(SparkConnectService.executionManager.listExecuteHolders.length
== 1)
}
stub.interrupt(interruptRequest)
+ // make sure the interrupt reaches the server
+ Eventually.eventually(timeout(eventuallyTimeout)) {
+ val execution =
SparkConnectService.executionManager.listActiveExecutions match {
+ case Right(list) => list.find(_.operationId == operationId)
+ case Left(_) => None
+ }
+ assert(execution.isDefined && execution.get.status ==
ExecuteStatus.Canceled)
+ }
+ // make sure the client gets the OPERATION_CANCELED error
val e = intercept[StatusRuntimeException] {
while (iter.hasNext) iter.next()
Review Comment:
I realize this is not needed, because the `sleep` query should ensure that
we don't get results to iterate through for 30 seconds. By this time, interrupt
should reach the server.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]