xupefei commented on code in PR #47815:
URL: https://github.com/apache/spark/pull/47815#discussion_r1734759963
##########
sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala:
##########
@@ -794,6 +814,120 @@ class SparkSession private(
}
}
+
+ /**
+ * Add a tag to be assigned to all the operations started by this thread in
this session.
+ *
+ * Often, a unit of execution in an application consists of multiple Spark
executions.
+ * Application programmers can use this method to group all those jobs
together and give a group
+ * tag. The application can use
`org.apache.spark.sql.SparkSession.interruptTag` to cancel all
+ * running executions with this tag. For example:
+ * {{{
+ * // In the main thread:
+ * spark.addTag("myjobs")
+ * spark.range(10).map(i => { Thread.sleep(10); i }).collect()
+ *
+ * // In a separate thread:
+ * spark.interruptTag("myjobs")
+ * }}}
+ *
+ * There may be multiple tags present at the same time, so different parts
of application may
+ * use different tags to perform cancellation at different levels of
granularity.
+ *
+ * @param tag
+ * The tag to be added. Cannot contain ',' (comma) character or be an
empty string.
+ *
+ * @since 4.0.0
+ */
+ def addTag(tag: String): Unit = {
+ SparkContext.throwIfInvalidTag(tag)
+ userDefinedToRealTagsMap.put(tag, s"spark-session-$sessionUUID-$tag")
+ }
+
+ /**
+ * Remove a tag previously added to be assigned to all the operations
started by this thread in
+ * this session. Noop if such a tag was not added earlier.
+ *
+ * @param tag
+ * The tag to be removed. Cannot contain ',' (comma) character or be an
empty string.
+ *
+ * @since 4.0.0
+ */
+ def removeTag(tag: String): Unit = {
+ SparkContext.throwIfInvalidTag(tag)
+ userDefinedToRealTagsMap.remove(tag)
+ }
+
+ /**
+ * Get the tags that are currently set to be assigned to all the operations
started by this
+ * thread.
+ *
+ * @since 4.0.0
+ */
+ def getTags(): Set[String] = userDefinedToRealTagsMap.keys().asScala.toSet
+
+ /**
+ * Clear the current thread's operation tags.
+ *
+ * @since 4.0.0
+ */
+ def clearTags(): Unit = userDefinedToRealTagsMap.clear()
+
+ /**
+ * Request to interrupt all currently running operations of this session.
+ *
+ * @note This method will wait up to 60 seconds for the interruption request
to be issued.
+
+ * @return Sequence of job IDs requested to be interrupted.
+
+ * @since 4.0.0
+ */
+ def interruptAll(): Seq[String] = {
+ val cancelledIds = sparkContext.cancelAllJobs(
+ _.properties.getProperty(SPARK_SESSION_UUID_PROPERTY_KEY) == sessionUUID)
+ ThreadUtils.awaitResult(cancelledIds, 60.seconds).map(_.toString).toSeq
+ }
+
+ /**
+ * Request to interrupt all currently running operations of this session
with the given operation
+ * tag.
+ *
+ * @note This method will wait up to 60 seconds for the interruption request
to be issued.
+ *
+ * @return Sequence of job IDs requested to be interrupted.
+ */
+ def interruptTag(tag: String): Seq[String] = {
+ val realTag = userDefinedToRealTagsMap.get(tag)
+ if (realTag == null) return Seq.empty
+
+ val cancelledIds = sparkContext.cancelJobsWithTag(realTag, "Interrupted by
user", _ => true)
+ ThreadUtils.awaitResult(cancelledIds, 60.seconds).map(_.toString).toSeq
+ }
+
+ /**
+ * Request to interrupt an operation of this session, given its job ID.
+ *
+ * @note This method will wait up to 60 seconds for the interruption request
to be issued.
+ *
+ * @return The job ID requested to be interrupted, as a single-element
sequence, or an empty
+ * sequence if the operation is not started by this session.
+ *
+ * @since 4.0.0
+ */
+ def interruptOperation(jobId: String): Seq[String] = {
+ scala.util.Try(jobId.toInt).toOption match {
+ case Some(jobIdToBeCancelled) =>
+ val cancelledIds = sparkContext.cancelJob(
+ jobIdToBeCancelled,
+ "Interrupted by user",
+ shouldCancelJob =
_.properties.getProperty(SPARK_SESSION_UUID_PROPERTY_KEY) == sessionUUID
Review Comment:
Yes. We don't need this now :)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]