zsxwing commented on a change in pull request #23156: [SPARK-24063][SS] Add
maximum epoch queue threshold for ContinuousExecution
URL: https://github.com/apache/spark/pull/23156#discussion_r266540366
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
##########
@@ -373,6 +382,35 @@ class ContinuousExecution(
}
}
+ /**
+ * Stores error and stops the query execution thread to terminate the query
in new thread.
+ */
+ def stopInNewThread(error: Throwable): Unit = {
+ if (failure.compareAndSet(null, error)) {
+ logError(s"Query $prettyIdString received exception $error")
+ stopInNewThread()
Review comment:
Looks like there is a race here. The query stop may happen before the
continuous-execution checks `failure` and the query will just stop without any
exception, just like someone stops a query manually.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]