gaborgsomogyi commented on a change in pull request #23156: [SPARK-24063][SS]
Add maximum epoch queue threshold for ContinuousExecution
URL: https://github.com/apache/spark/pull/23156#discussion_r259337663
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
##########
@@ -390,6 +398,41 @@ class ContinuousExecution(
}
}
+ /**
+ * Stores error and stops the query execution thread to terminate the query
in new thread.
+ */
+ def stopInNewThread(error: Throwable): Unit = {
+ failureLock.synchronized {
+ failure match {
+ case None =>
+ logError(s"Query $prettyIdString received exception $error")
+ failure = Some(error)
+ stopInNewThread()
+ case _ =>
+ // Stop already initiated
+ }
+ }
+ }
+
+ /**
+ * Stops the query execution thread to terminate the query in new thread.
+ */
+ private def stopInNewThread(): Unit = {
+ new Thread("stop-continuous-execution") {
Review comment:
Hmmm, seems like the result has to be abandoned as well so a new function is
required if we want this to be common.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]