tdas commented on a change in pull request #26225: [SPARK-29568][SS] Stop
existing running streams when a new stream is launched
URL: https://github.com/apache/spark/pull/26225#discussion_r345538308
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
##########
@@ -343,27 +344,60 @@ class StreamingQueryManager private[sql] (sparkSession:
SparkSession) extends Lo
trigger,
triggerClock)
- activeQueriesLock.synchronized {
+ // The following code block checks if a stream with the same name or id is
running. Then it
+ // returns an Option of an already active stream to stop outside of the
lock
+ // to avoid a deadlock.
+ val activeRunOpt = activeQueriesSharedLock.synchronized {
// Make sure no other query with same name is active
userSpecifiedName.foreach { name =>
if (activeQueries.values.exists(_.name == name)) {
- throw new IllegalArgumentException(
- s"Cannot start query with name $name as a query with that name is
already active")
+ throw new IllegalArgumentException(s"Cannot start query with name
$name as a query " +
+ s"with that name is already active in this SparkSession")
}
}
// Make sure no other query with same id is active across all sessions
- val activeOption =
-
Option(sparkSession.sharedState.activeStreamingQueries.putIfAbsent(query.id,
this))
- if (activeOption.isDefined || activeQueries.values.exists(_.id ==
query.id)) {
- throw new IllegalStateException(
- s"Cannot start query with id ${query.id} as another query with same
id is " +
- s"already active. Perhaps you are attempting to restart a query
from checkpoint " +
- s"that is already active.")
+ val activeOption =
Option(sparkSession.sharedState.activeStreamingQueries.get(query.id))
+ .orElse(activeQueries.get(query.id)) // shouldn't be needed but
paranoia ...
+
+ val shouldStopActiveRun =
+
sparkSession.sessionState.conf.getConf(SQLConf.STREAMING_STOP_ACTIVE_RUN_ON_RESTART)
+ if (activeOption.isDefined) {
+ if (shouldStopActiveRun) {
+ val oldQuery = activeOption.get
+ logWarning(s"Stopping existing streaming query [id=${query.id}, " +
+ s"runId=${oldQuery.runId}], as a new run is being started.")
+ Some(oldQuery)
+ } else {
+ throw new IllegalStateException(
+ s"Cannot start query with id ${query.id} as another query with
same id is " +
+ s"already active. Perhaps you are attempting to restart a query
from checkpoint " +
+ s"that is already active. You may stop the old query by setting
the SQL " +
+ "configuration: " +
+
s"""spark.conf.set("${SQLConf.STREAMING_STOP_ACTIVE_RUN_ON_RESTART.key}", true)
""" +
+ "and retry.")
+ }
+ } else {
+ // nothing to stop so, no-op
+ None
}
+ }
+ activeRunOpt.foreach(_.stop())
+
+ activeQueriesSharedLock.synchronized {
+ // We still can have a race condition when two concurrent instances try
to start the same
Review comment:
nit: This comment is true only if the active run was stopped. So qualify the
comment accordingly.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]