juliuszsompolski commented on code in PR #42304:
URL: https://github.com/apache/spark/pull/42304#discussion_r1283751725
##########
connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectServiceSuite.scala:
##########
@@ -493,24 +493,37 @@ class SparkConnectServiceSuite extends SharedSparkSession
with MockitoSugar with
.setSessionId(sessionId)
.build()
- // The observer is executed inside this thread. So
- // we can perform the checks inside the observer.
+ // Even though the observer is executed inside this thread, this thread
is also executing
+ // the SparkConnectService. If we throw an exception inside it, it will
be caught by
+ // the ErrorUtils.handleError wrapping instance.executePlan and turned
into an onError
+ // call with StatusRuntimeException, which will be eaten here.
+ var failures: mutable.ArrayBuffer[String] = new
mutable.ArrayBuffer[String]()
instance.executePlan(
request,
new StreamObserver[proto.ExecutePlanResponse] {
override def onNext(v: proto.ExecutePlanResponse): Unit = {
- fail("this should not receive responses")
+ // The query receives some pre-execution responses such as schema,
but should
+ // never proceed to execution and get query results.
+ if (v.hasArrowBatch) {
+ failures += s"this should not receive query results but got $v"
+ }
}
override def onError(throwable: Throwable): Unit = {
- assert(throwable.isInstanceOf[StatusRuntimeException])
- verifyEvents.onError(throwable)
+ try {
+ assert(throwable.isInstanceOf[StatusRuntimeException])
+ verifyEvents.onError(throwable)
Review Comment:
It started failing because this `verifyEvents` had a weird interaction here:
- the error thrown from onNext went up to SparkConnectExecutePlanHandler and
to executeHolder.close() because this is non-reattachable (reattachable
execution would not get killed because of GRPC thread error, but
non-reattachable does)
- when this was done, this PR triggers it to interrupt the query (this was
actually a "bug" of non-reattachable execution - that even if RPC dies, the
query keeps running)
- that made a canceled event be propagated to eventManager.
- but then it went up to handleError, and triggered this verifyEvents error
here.
This would not happen in real life, because a onError on the RPC thread
(which is the onError here) would not be the one posting the error event.
##########
connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectServiceSuite.scala:
##########
@@ -493,24 +493,37 @@ class SparkConnectServiceSuite extends SharedSparkSession
with MockitoSugar with
.setSessionId(sessionId)
.build()
- // The observer is executed inside this thread. So
- // we can perform the checks inside the observer.
+ // Even though the observer is executed inside this thread, this thread
is also executing
+ // the SparkConnectService. If we throw an exception inside it, it will
be caught by
+ // the ErrorUtils.handleError wrapping instance.executePlan and turned
into an onError
+ // call with StatusRuntimeException, which will be eaten here.
Review Comment:
This test never worked for reasons described here.
It was throwing an exception from onNext upon receiving a schema response
before execution ever started, that exception was caught by handleError and
turned into a StatusRuntimeException, which was happily accepted here.
So this test never reached the insta_kill UDF.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]