jaceklaskowski commented on code in PR #41440:
URL: https://github.com/apache/spark/pull/41440#discussion_r1234983255
##########
core/src/main/scala/org/apache/spark/SparkContext.scala:
##########
@@ -829,6 +829,55 @@ class SparkContext(config: SparkConf) extends Logging {
setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, null)
}
+ /**
+ * Set the behavior of job cancellation from jobs started in this thread.
+ *
+ * @param interruptOnCancel If true, then job cancellation will result in
`Thread.interrupt()`
+ * being called on the job's executor threads. This is useful to help ensure
that the tasks
+ * are actually stopped in a timely manner, but is off by default due to
HDFS-1208, where HDFS
+ * may respond to Thread.interrupt() by marking nodes as dead.
+ */
+ def setInterruptOnCancel(interruptOnCancel: Boolean): Unit = {
+ setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL,
interruptOnCancel.toString)
+ }
+
+ /**
+ * Add a tag to be assigned to all the jobs started by this thread.
Review Comment:
nit: Adds
##########
core/src/main/scala/org/apache/spark/SparkContext.scala:
##########
@@ -829,6 +829,55 @@ class SparkContext(config: SparkConf) extends Logging {
setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, null)
}
+ /**
+ * Set the behavior of job cancellation from jobs started in this thread.
+ *
+ * @param interruptOnCancel If true, then job cancellation will result in
`Thread.interrupt()`
+ * being called on the job's executor threads. This is useful to help ensure
that the tasks
+ * are actually stopped in a timely manner, but is off by default due to
HDFS-1208, where HDFS
+ * may respond to Thread.interrupt() by marking nodes as dead.
+ */
+ def setInterruptOnCancel(interruptOnCancel: Boolean): Unit = {
+ setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL,
interruptOnCancel.toString)
+ }
+
+ /**
+ * Add a tag to be assigned to all the jobs started by this thread.
+ *
+ * @param tagName The tag to be added. Cannot contain ',' (comma) character.
+ */
+ def addJobTag(tagName: String): Unit = {
Review Comment:
Just curious if `tag` alone were enough. I can't explain it but `tagName`
looks like `labelName` which in turn looks like `nameName`.
While we're at it (and since I dislike `Unit` as un-testable), can we return
`newTags`?
BTW, `newTags` not `newTagNames` seems prove my point above 😉
##########
core/src/main/scala/org/apache/spark/SparkContext.scala:
##########
@@ -829,6 +829,55 @@ class SparkContext(config: SparkConf) extends Logging {
setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, null)
}
+ /**
+ * Set the behavior of job cancellation from jobs started in this thread.
+ *
+ * @param interruptOnCancel If true, then job cancellation will result in
`Thread.interrupt()`
+ * being called on the job's executor threads. This is useful to help ensure
that the tasks
+ * are actually stopped in a timely manner, but is off by default due to
HDFS-1208, where HDFS
+ * may respond to Thread.interrupt() by marking nodes as dead.
+ */
+ def setInterruptOnCancel(interruptOnCancel: Boolean): Unit = {
+ setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL,
interruptOnCancel.toString)
+ }
+
+ /**
+ * Add a tag to be assigned to all the jobs started by this thread.
+ *
+ * @param tagName The tag to be added. Cannot contain ',' (comma) character.
+ */
+ def addJobTag(tagName: String): Unit = {
+ SparkContext.throwIfInvalidTagName(tagName)
+ val existingTags = getJobTags()
+ val newTags = (existingTags +
tagName).mkString(SparkContext.SPARK_JOB_TAGS_SEP)
+ setLocalProperty(SparkContext.SPARK_JOB_TAGS, newTags)
+ }
+
+ /**
+ * Remove a tag previously added to be assigned to all the jobs started by
this thread.
+ * Noop if such a tag was not added.
Review Comment:
nit: if the tag was not added earlier
##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -1085,6 +1085,15 @@ private[spark] class DAGScheduler(
eventProcessLoop.post(JobGroupCancelled(groupId))
}
+ /**
+ * Cancel all jobs with a given tag.
+ */
+ def cancelJobsWithTag(tagName: String): Unit = {
+ SparkContext.throwIfInvalidTagName(tagName)
+ logInfo("Asked to cancel jobs with tag " + tagName)
Review Comment:
nit: s"Asked...$tagName"
##########
core/src/main/scala/org/apache/spark/SparkContext.scala:
##########
@@ -2471,6 +2520,17 @@ class SparkContext(config: SparkConf) extends Logging {
dagScheduler.cancelJobGroup(groupId)
}
+ /**
+ * Cancel active jobs that have the specified tag. See
`org.apache.spark.SparkContext.addJobTag`.
Review Comment:
nit: `@see [addJobTag](#addJobTag)`
##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -1182,6 +1191,20 @@ private[spark] class DAGScheduler(
Option("part of cancelled job group %s".format(groupId))))
}
+ private[scheduler] def handleJobTagCancelled(tagName: String): Unit = {
+ // Cancel all jobs belonging that have this tag.
+ // First finds all active jobs with this group id, and then kill stages
for them.
+ val activeInGroup = activeJobs.filter { activeJob =>
+ Option(activeJob.properties).exists { properties =>
+
Option(properties.getProperty(SparkContext.SPARK_JOB_TAGS)).getOrElse("")
+ .split(SparkContext.SPARK_JOB_TAGS_SEP).toSet.contains(tagName)
+ }
+ }
+ val jobIds = activeInGroup.map(_.jobId)
+ jobIds.foreach(handleJobCancellation(_,
+ Option("part of cancelled job tag %s".format(tagName))))
Review Comment:
s"part of...$tagName"
##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -1182,6 +1191,20 @@ private[spark] class DAGScheduler(
Option("part of cancelled job group %s".format(groupId))))
}
+ private[scheduler] def handleJobTagCancelled(tagName: String): Unit = {
+ // Cancel all jobs belonging that have this tag.
+ // First finds all active jobs with this group id, and then kill stages
for them.
+ val activeInGroup = activeJobs.filter { activeJob =>
+ Option(activeJob.properties).exists { properties =>
+
Option(properties.getProperty(SparkContext.SPARK_JOB_TAGS)).getOrElse("")
+ .split(SparkContext.SPARK_JOB_TAGS_SEP).toSet.contains(tagName)
+ }
+ }
+ val jobIds = activeInGroup.map(_.jobId)
Review Comment:
Why not join `activeInGroup` and `jobIds` so it'd be as follows:
```scala
val jobIds = activeJobs.filter { activeJob => ... }
.map(_.jobId)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]