tdas commented on a change in pull request #26225: [SPARK-29568][SS] Stop 
existing running streams when a new stream is launched
URL: https://github.com/apache/spark/pull/26225#discussion_r345538528
 
 

 ##########
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
 ##########
 @@ -343,27 +344,60 @@ class StreamingQueryManager private[sql] (sparkSession: 
SparkSession) extends Lo
       trigger,
       triggerClock)
 
-    activeQueriesLock.synchronized {
+    // The following code block checks if a stream with the same name or id is 
running. Then it
+    // returns an Option of an already active stream to stop outside of the 
lock
+    // to avoid a deadlock.
+    val activeRunOpt = activeQueriesSharedLock.synchronized {
       // Make sure no other query with same name is active
       userSpecifiedName.foreach { name =>
         if (activeQueries.values.exists(_.name == name)) {
-          throw new IllegalArgumentException(
-            s"Cannot start query with name $name as a query with that name is 
already active")
+          throw new IllegalArgumentException(s"Cannot start query with name 
$name as a query " +
+            s"with that name is already active in this SparkSession")
         }
       }
 
       // Make sure no other query with same id is active across all sessions
-      val activeOption =
-        
Option(sparkSession.sharedState.activeStreamingQueries.putIfAbsent(query.id, 
this))
-      if (activeOption.isDefined || activeQueries.values.exists(_.id == 
query.id)) {
-        throw new IllegalStateException(
-          s"Cannot start query with id ${query.id} as another query with same 
id is " +
-            s"already active. Perhaps you are attempting to restart a query 
from checkpoint " +
-            s"that is already active.")
+      val activeOption = 
Option(sparkSession.sharedState.activeStreamingQueries.get(query.id))
+        .orElse(activeQueries.get(query.id)) // shouldn't be needed but 
paranoia ...
+
+      val shouldStopActiveRun =
+        
sparkSession.sessionState.conf.getConf(SQLConf.STREAMING_STOP_ACTIVE_RUN_ON_RESTART)
+      if (activeOption.isDefined) {
+        if (shouldStopActiveRun) {
+          val oldQuery = activeOption.get
+          logWarning(s"Stopping existing streaming query [id=${query.id}, " +
+            s"runId=${oldQuery.runId}], as a new run is being started.")
+          Some(oldQuery)
+        } else {
+          throw new IllegalStateException(
+            s"Cannot start query with id ${query.id} as another query with 
same id is " +
+              s"already active. Perhaps you are attempting to restart a query 
from checkpoint " +
+              s"that is already active. You may stop the old query by setting 
the SQL " +
+              "configuration: " +
+              
s"""spark.conf.set("${SQLConf.STREAMING_STOP_ACTIVE_RUN_ON_RESTART.key}", true) 
""" +
+              "and retry.")
+        }
+      } else {
+        // nothing to stop so, no-op
+        None
       }
+    }
 
+    activeRunOpt.foreach(_.stop())
 
 Review comment:
   Please document here that stop() will automatically clear the 
`activeStreamingQueries`. Without this implicit easy-to-miss information, it is 
hard to reason about this code. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to