Github user jose-torres commented on a diff in the pull request:
https://github.com/apache/spark/pull/21384#discussion_r189970144
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
---
@@ -356,6 +356,22 @@ class ContinuousExecution(
}
}
}
+
+ /**
+ * Stops the query execution thread to terminate the query.
+ */
+ override def stop(): Unit = {
+ // Set the state to TERMINATED so that the batching thread knows that
it was interrupted
+ // intentionally
+ state.set(TERMINATED)
+ if (queryExecutionThread.isAlive) {
+ // The query execution thread will clean itself up in the finally
clause of runContinuous.
+ // We just need to interrupt the long running job.
+ queryExecutionThread.interrupt()
+ queryExecutionThread.join()
--- End diff --
Correct. The remaining one in the finally clause of runContinuous() is
sufficient, because jobs are only started within that method.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]