xupefei commented on code in PR #47815:
URL: https://github.com/apache/spark/pull/47815#discussion_r1734187514
##########
sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala:
##########
@@ -794,6 +814,120 @@ class SparkSession private(
}
}
+
+ /**
+ * Add a tag to be assigned to all the operations started by this thread in
this session.
+ *
+ * Often, a unit of execution in an application consists of multiple Spark
executions.
+ * Application programmers can use this method to group all those jobs
together and give a group
+ * tag. The application can use
`org.apache.spark.sql.SparkSession.interruptTag` to cancel all
+ * running executions with this tag. For example:
+ * {{{
+ * // In the main thread:
+ * spark.addTag("myjobs")
+ * spark.range(10).map(i => { Thread.sleep(10); i }).collect()
+ *
+ * // In a separate thread:
+ * spark.interruptTag("myjobs")
+ * }}}
+ *
+ * There may be multiple tags present at the same time, so different parts
of application may
+ * use different tags to perform cancellation at different levels of
granularity.
+ *
+ * @param tag
+ * The tag to be added. Cannot contain ',' (comma) character or be an
empty string.
+ *
+ * @since 4.0.0
+ */
+ def addTag(tag: String): Unit = {
+ SparkContext.throwIfInvalidTag(tag)
+ userDefinedToRealTagsMap.put(tag, s"spark-session-$sessionUUID-$tag")
+ }
+
+ /**
+ * Remove a tag previously added to be assigned to all the operations
started by this thread in
+ * this session. Noop if such a tag was not added earlier.
+ *
+ * @param tag
+ * The tag to be removed. Cannot contain ',' (comma) character or be an
empty string.
+ *
+ * @since 4.0.0
+ */
+ def removeTag(tag: String): Unit = {
+ SparkContext.throwIfInvalidTag(tag)
+ userDefinedToRealTagsMap.remove(tag)
+ }
+
+ /**
+ * Get the tags that are currently set to be assigned to all the operations
started by this
+ * thread.
+ *
+ * @since 4.0.0
+ */
+ def getTags(): Set[String] = userDefinedToRealTagsMap.keys().asScala.toSet
+
+ /**
+ * Clear the current thread's operation tags.
+ *
+ * @since 4.0.0
+ */
+ def clearTags(): Unit = userDefinedToRealTagsMap.clear()
+
+ /**
+ * Request to interrupt all currently running operations of this session.
+ *
+ * @note This method will wait up to 60 seconds for the interruption request
to be issued.
+
+ * @return Sequence of job IDs requested to be interrupted.
+
+ * @since 4.0.0
+ */
+ def interruptAll(): Seq[String] = {
+ val cancelledIds = sparkContext.cancelAllJobs(
Review Comment:
I tried this approach before. The problem is that adding a tag will break
many tests, as they assume a job has no tag unless added.
We could make those tests happy by prefixing the tag with some special
characters such as `__hidden_tag__` and do not return them by default.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]