xupefei commented on code in PR #47815:
URL: https://github.com/apache/spark/pull/47815#discussion_r1738508417


##########
sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala:
##########
@@ -794,6 +814,120 @@ class SparkSession private(
     }
   }
 
+
+  /**
+   * Add a tag to be assigned to all the operations started by this thread in 
this session.
+   *
+   * Often, a unit of execution in an application consists of multiple Spark 
executions.
+   * Application programmers can use this method to group all those jobs 
together and give a group
+   * tag. The application can use 
`org.apache.spark.sql.SparkSession.interruptTag` to cancel all
+   * running executions with this tag. For example:
+   * {{{
+   * // In the main thread:
+   * spark.addTag("myjobs")
+   * spark.range(10).map(i => { Thread.sleep(10); i }).collect()
+   *
+   * // In a separate thread:
+   * spark.interruptTag("myjobs")
+   * }}}
+   *
+   * There may be multiple tags present at the same time, so different parts 
of application may
+   * use different tags to perform cancellation at different levels of 
granularity.
+   *
+   * @param tag
+   *   The tag to be added. Cannot contain ',' (comma) character or be an 
empty string.
+   *
+   * @since 4.0.0
+   */
+  def addTag(tag: String): Unit = {
+    SparkContext.throwIfInvalidTag(tag)
+    userDefinedToRealTagsMap.put(tag, s"spark-session-$sessionUUID-$tag")
+  }
+
+  /**
+   * Remove a tag previously added to be assigned to all the operations 
started by this thread in
+   * this session. Noop if such a tag was not added earlier.
+   *
+   * @param tag
+   *   The tag to be removed. Cannot contain ',' (comma) character or be an 
empty string.
+   *
+   * @since 4.0.0
+   */
+  def removeTag(tag: String): Unit = {
+    SparkContext.throwIfInvalidTag(tag)
+    userDefinedToRealTagsMap.remove(tag)
+  }
+
+  /**
+   * Get the tags that are currently set to be assigned to all the operations 
started by this
+   * thread.
+   *
+   * @since 4.0.0
+   */
+  def getTags(): Set[String] = userDefinedToRealTagsMap.keys().asScala.toSet
+
+  /**
+   * Clear the current thread's operation tags.
+   *
+   * @since 4.0.0
+   */
+  def clearTags(): Unit = userDefinedToRealTagsMap.clear()
+
+  /**
+   * Request to interrupt all currently running operations of this session.
+   *
+   * @note This method will wait up to 60 seconds for the interruption request 
to be issued.
+
+   * @return Sequence of job IDs requested to be interrupted.
+
+   * @since 4.0.0
+   */
+  def interruptAll(): Seq[String] = {
+    val cancelledIds = sparkContext.cancelAllJobs(
+      _.properties.getProperty(SPARK_SESSION_UUID_PROPERTY_KEY) == sessionUUID)
+    ThreadUtils.awaitResult(cancelledIds, 60.seconds).map(_.toString).toSeq
+  }
+
+  /**
+   * Request to interrupt all currently running operations of this session 
with the given operation
+   * tag.
+   *
+   * @note This method will wait up to 60 seconds for the interruption request 
to be issued.
+   *
+   * @return Sequence of job IDs requested to be interrupted.
+   */
+  def interruptTag(tag: String): Seq[String] = {
+    val realTag = userDefinedToRealTagsMap.get(tag)
+    if (realTag == null) return Seq.empty
+
+    val cancelledIds = sparkContext.cancelJobsWithTag(realTag, "Interrupted by 
user", _ => true)
+    ThreadUtils.awaitResult(cancelledIds, 60.seconds).map(_.toString).toSeq
+  }
+
+  /**
+   * Request to interrupt an operation of this session, given its job ID.
+   *
+   * @note This method will wait up to 60 seconds for the interruption request 
to be issued.
+   *
+   * @return The job ID requested to be interrupted, as a single-element 
sequence, or an empty
+   *    sequence if the operation is not started by this session.
+   *
+   * @since 4.0.0
+   */
+  def interruptOperation(jobId: String): Seq[String] = {

Review Comment:
   Done!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to