hvanhovell commented on code in PR #47815:
URL: https://github.com/apache/spark/pull/47815#discussion_r1723408463


##########
sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala:
##########
@@ -794,6 +795,108 @@ class SparkSession private(
     }
   }
 
+
+  /**
+   * Add a tag to be assigned to all the operations started by this thread in 
this session.
+   *
+   * Often, a unit of execution in an application consists of multiple Spark 
executions.
+   * Application programmers can use this method to group all those jobs 
together and give a group
+   * tag. The application can use 
`org.apache.spark.sql.SparkSession.interruptTag` to cancel all
+   * running executions with this tag. For example:
+   * {{{
+   * // In the main thread:
+   * spark.addTag("myjobs")
+   * spark.range(10).map(i => { Thread.sleep(10); i }).collect()
+   *
+   * // In a separate thread:
+   * spark.interruptTag("myjobs")
+   * }}}
+   *
+   * There may be multiple tags present at the same time, so different parts 
of application may
+   * use different tags to perform cancellation at different levels of 
granularity.
+   *
+   * @param tag
+   *   The tag to be added. Cannot contain ',' (comma) character or be an 
empty string.
+   *
+   * @since 4.0.0
+   */
+  def addTag(tag: String): Unit = sparkContext.addJobTag(tag)
+
+  /**
+   * Remove a tag previously added to be assigned to all the operations 
started by this thread in
+   * this session. Noop if such a tag was not added earlier.
+   *
+   * @param tag
+   *   The tag to be removed. Cannot contain ',' (comma) character or be an 
empty string.
+   *
+   * @since 4.0.0
+   */
+  def removeTag(tag: String): Unit = sparkContext.removeJobTag(tag)
+
+  /**
+   * Get the tags that are currently set to be assigned to all the operations 
started by this
+   * thread.
+   *
+   * @since 4.0.0
+   */
+  def getTags(): Set[String] = sparkContext.getJobTags()
+
+  /**
+   * Clear the current thread's operation tags.
+   *
+   * @since 4.0.0
+   */
+  def clearTags(): Unit = sparkContext.clearJobTags()
+
+  /**
+   * Interrupt all operations of this session that are currently running.
+   *
+   * @return
+   *   sequence of Job IDs of interrupted operations.
+   *
+   * @since 4.0.0
+   */
+  def interruptAll(): Seq[String] = {

Review Comment:
   This currently seems to cancel all running queries, not just the ones owned 
by this session.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to