HyukjinKwon commented on code in PR #46819:
URL: https://github.com/apache/spark/pull/46819#discussion_r1625230907


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala:
##########
@@ -194,14 +195,16 @@ case class SessionHolder(userId: String, sessionId: 
String, session: SparkSessio
    */
   private[service] def interruptTag(tag: String): Seq[String] = {
     val interruptedIds = new mutable.ArrayBuffer[String]()
+    val queries = 
SparkConnectService.streamingSessionManager.getTaggedQuery(tag, session)
+    queries.foreach(_.query.stop())

Review Comment:
   But seems like we cancel jobs in that case:
   
   ```
     override def stop(): Unit = {
       // Set the state to TERMINATED so that the batching thread knows that it 
was interrupted
       // intentionally
       state.set(TERMINATED)
       if (queryExecutionThread.isAlive) {
         sparkSession.sparkContext.cancelJobGroup(runId.toString)
         interruptAndAwaitExecutionThreadTermination()
         // microBatchThread may spawn new jobs, so we need to cancel again to 
prevent a leak
         sparkSession.sparkContext.cancelJobGroup(runId.toString)
       }
       logInfo(log"Query ${MDC(LogKeys.PRETTY_ID_STRING, prettyIdString)} was 
stopped")
     }
   ```
   
   so I think we're all good



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to