Repository: spark
Updated Branches:
  refs/heads/master f99401a63 -> 2c557837b


SPARK-1202 - Add a "cancel" button in the UI for stages

Author: Sundeep Narravula <sundeepn@superduel.local>
Author: Sundeep Narravula <sunde...@dhcpx-204-110.corp.yahoo.com>

Closes #246 from sundeepn/uikilljob and squashes the following commits:

5fdd0e2 [Sundeep Narravula] Fix test string
f6fdff1 [Sundeep Narravula] Format fix; reduced line size to less than 100 chars
d1daeb9 [Sundeep Narravula] Incorporating review comments.
8d97923 [Sundeep Narravula] Ability to kill jobs thru the UI. This behavior can 
be turned on be settings the following variable: spark.ui.killEnabled=true 
(default=false) Adding DAGScheduler event StageCancelled and corresponding 
handlers. Added cancellation reason to handlers.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2c557837
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2c557837
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2c557837

Branch: refs/heads/master
Commit: 2c557837b4a12c644cc37bd00d02be04f3807637
Parents: f99401a
Author: Sundeep Narravula <sundeepn@superduel.local>
Authored: Thu Apr 10 17:10:11 2014 -0700
Committer: Patrick Wendell <pwend...@gmail.com>
Committed: Thu Apr 10 17:10:11 2014 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   | 10 ++++++
 .../apache/spark/scheduler/DAGScheduler.scala   | 32 +++++++++++++++++---
 .../spark/scheduler/DAGSchedulerEvent.scala     |  2 ++
 .../scala/org/apache/spark/ui/SparkUI.scala     |  1 +
 .../org/apache/spark/ui/jobs/IndexPage.scala    | 14 ++++++++-
 .../apache/spark/ui/jobs/JobProgressUI.scala    |  1 +
 .../org/apache/spark/ui/jobs/StagePage.scala    |  1 +
 .../org/apache/spark/ui/jobs/StageTable.scala   | 29 ++++++++++++++----
 .../spark/scheduler/DAGSchedulerSuite.scala     |  2 +-
 docs/configuration.md                           |  7 +++++
 10 files changed, 87 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2c557837/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index e6c9b70..3bcc8ce 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1138,6 +1138,16 @@ class SparkContext(config: SparkConf) extends Logging {
     dagScheduler.cancelAllJobs()
   }
 
+  /** Cancel a given job if it's scheduled or running */
+  private[spark] def cancelJob(jobId: Int) {
+    dagScheduler.cancelJob(jobId)
+  }
+
+  /** Cancel a given stage and all jobs associated with it */
+  private[spark] def cancelStage(stageId: Int) {
+    dagScheduler.cancelStage(stageId)
+  }
+
   /**
    * Clean a closure to make it ready to serialized and send to tasks
    * (removes unreferenced variables in $outer's, updates REPL variables)

http://git-wip-us.apache.org/repos/asf/spark/blob/2c557837/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index c41d6d7..c6cbf14 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -512,6 +512,13 @@ class DAGScheduler(
   }
 
   /**
+   * Cancel all jobs associated with a running or scheduled stage.
+   */
+  def cancelStage(stageId: Int) {
+    eventProcessActor ! StageCancelled(stageId)
+  }
+
+  /**
    * Process one event retrieved from the event processing actor.
    *
    * @param event The event to be processed.
@@ -551,6 +558,9 @@ class DAGScheduler(
           submitStage(finalStage)
         }
 
+      case StageCancelled(stageId) =>
+        handleStageCancellation(stageId)
+
       case JobCancelled(jobId) =>
         handleJobCancellation(jobId)
 
@@ -560,11 +570,13 @@ class DAGScheduler(
         val activeInGroup = activeJobs.filter(activeJob =>
           groupId == activeJob.properties.get(SparkContext.SPARK_JOB_GROUP_ID))
         val jobIds = activeInGroup.map(_.jobId)
-        jobIds.foreach(handleJobCancellation)
+        jobIds.foreach(jobId => handleJobCancellation(jobId,
+          "as part of cancelled job group %s".format(groupId)))
 
       case AllJobsCancelled =>
         // Cancel all running jobs.
-        runningStages.map(_.jobId).foreach(handleJobCancellation)
+        runningStages.map(_.jobId).foreach(jobId => 
handleJobCancellation(jobId,
+          "as part of cancellation of all jobs"))
         activeJobs.clear()      // These should already be empty by this point,
         jobIdToActiveJob.clear()   // but just in case we lost track of some 
jobs...
 
@@ -991,11 +1003,23 @@ class DAGScheduler(
     }
   }
 
-  private def handleJobCancellation(jobId: Int) {
+  private def handleStageCancellation(stageId: Int) {
+    if (stageIdToJobIds.contains(stageId)) {
+      val jobsThatUseStage: Array[Int] = stageIdToJobIds(stageId).toArray
+      jobsThatUseStage.foreach(jobId => {
+        handleJobCancellation(jobId, "because Stage %s was 
cancelled".format(stageId))
+      })
+    } else {
+      logInfo("No active jobs to kill for Stage " + stageId)
+    }
+  }
+
+  private def handleJobCancellation(jobId: Int, reason: String = "") {
     if (!jobIdToStageIds.contains(jobId)) {
       logDebug("Trying to cancel unregistered job " + jobId)
     } else {
-      failJobAndIndependentStages(jobIdToActiveJob(jobId), s"Job $jobId 
cancelled", None)
+      failJobAndIndependentStages(jobIdToActiveJob(jobId),
+        "Job %d cancelled %s".format(jobId, reason), None)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/2c557837/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
index 293cfb6..7367c08 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
@@ -44,6 +44,8 @@ private[scheduler] case class JobSubmitted(
     properties: Properties = null)
   extends DAGSchedulerEvent
 
+private[scheduler] case class StageCancelled(stageId: Int) extends 
DAGSchedulerEvent
+
 private[scheduler] case class JobCancelled(jobId: Int) extends 
DAGSchedulerEvent
 
 private[scheduler] case class JobGroupCancelled(groupId: String) extends 
DAGSchedulerEvent

http://git-wip-us.apache.org/repos/asf/spark/blob/2c557837/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala 
b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
index b8e6e15..dac11ec 100644
--- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
@@ -46,6 +46,7 @@ private[spark] class SparkUI(
   val live = sc != null
 
   val securityManager = if (live) sc.env.securityManager else new 
SecurityManager(conf)
+  val killEnabled = conf.getBoolean("spark.ui.killEnabled", true)
 
   private val localHost = Utils.localHostName()
   private val publicHost = 
Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(localHost)

http://git-wip-us.apache.org/repos/asf/spark/blob/2c557837/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala 
b/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala
index f811aff..5da5d1f 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala
@@ -32,6 +32,7 @@ private[ui] class IndexPage(parent: JobProgressUI) {
   private val sc = parent.sc
   private lazy val listener = parent.listener
   private lazy val isFairScheduler = parent.isFairScheduler
+  private val killEnabled = parent.killEnabled
 
   private def appName = parent.appName
 
@@ -42,7 +43,18 @@ private[ui] class IndexPage(parent: JobProgressUI) {
       val failedStages = listener.failedStages.reverse.toSeq
       val now = System.currentTimeMillis()
 
-      val activeStagesTable = new 
StageTable(activeStages.sortBy(_.submissionTime).reverse, parent)
+      if (killEnabled) {
+        val killFlag = 
Option(request.getParameter("terminate")).getOrElse("false").toBoolean
+        val stageId = Option(request.getParameter("id")).getOrElse("-1").toInt
+
+        if (stageId >= 0 && killFlag && 
listener.activeStages.contains(stageId)) {
+          sc.cancelStage(stageId)
+        }
+      }
+
+
+      val activeStagesTable =
+        new StageTable(activeStages.sortBy(_.submissionTime).reverse, parent, 
parent.killEnabled)
       val completedStagesTable =
         new StageTable(completedStages.sortBy(_.submissionTime).reverse, 
parent)
       val failedStagesTable = new 
StageTable(failedStages.sortBy(_.submissionTime).reverse, parent)

http://git-wip-us.apache.org/repos/asf/spark/blob/2c557837/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala 
b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala
index ad1a12c..9de659d 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala
@@ -32,6 +32,7 @@ private[ui] class JobProgressUI(parent: SparkUI) {
   val basePath = parent.basePath
   val live = parent.live
   val sc = parent.sc
+  val killEnabled = parent.killEnabled
 
   lazy val listener = _listener.get
   lazy val isFairScheduler = listener.schedulingMode.exists(_ == 
SchedulingMode.FAIR)

http://git-wip-us.apache.org/repos/asf/spark/blob/2c557837/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala 
b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index 0bcbd74..b6c3e3c 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -30,6 +30,7 @@ import org.apache.spark.util.{Utils, Distribution}
 private[ui] class StagePage(parent: JobProgressUI) {
   private val basePath = parent.basePath
   private lazy val listener = parent.listener
+  private lazy val sc = parent.sc
 
   private def appName = parent.appName
 

http://git-wip-us.apache.org/repos/asf/spark/blob/2c557837/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala 
b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
index ac61568..1e874ae 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
@@ -27,7 +27,11 @@ import org.apache.spark.ui.{WebUI, UIUtils}
 import org.apache.spark.util.Utils
 
 /** Page showing list of all ongoing and recently finished stages */
-private[ui] class StageTable(stages: Seq[StageInfo], parent: JobProgressUI) {
+private[ui] class StageTable(
+  stages: Seq[StageInfo],
+  parent: JobProgressUI,
+  killEnabled: Boolean = false) {
+
   private val basePath = parent.basePath
   private lazy val listener = parent.listener
   private lazy val isFairScheduler = parent.isFairScheduler
@@ -71,15 +75,28 @@ private[ui] class StageTable(stages: Seq[StageInfo], 
parent: JobProgressUI) {
     </div>
   }
 
-  /** Render an HTML row that represents a stage */
-  private def stageRow(s: StageInfo): Seq[Node] = {
-    val poolName = listener.stageIdToPool.get(s.stageId)
+  private def makeDescription(s: StageInfo): Seq[Node] = {
     val nameLink =
       <a 
href={"%s/stages/stage?id=%s".format(UIUtils.prependBaseUri(basePath), 
s.stageId)}>
         {s.name}
       </a>
+    val killLink = if (killEnabled) {
+      <div>[<a href=
+        
{"%s/stages?id=%s&terminate=true".format(UIUtils.prependBaseUri(basePath), 
s.stageId)}>
+          Kill
+      </a>]</div>
+
+    }
     val description = listener.stageIdToDescription.get(s.stageId)
-      .map(d => 
<div><em>{d}</em></div><div>{nameLink}</div>).getOrElse(nameLink)
+      .map(d => <div><em>{d}</em></div><div>{nameLink} {killLink}</div>)
+      .getOrElse(<div>{nameLink} {killLink}</div>)
+
+    return description
+  }
+
+  /** Render an HTML row that represents a stage */
+  private def stageRow(s: StageInfo): Seq[Node] = {
+    val poolName = listener.stageIdToPool.get(s.stageId)
     val submissionTime = s.submissionTime match {
       case Some(t) => WebUI.formatDate(new Date(t))
       case None => "Unknown"
@@ -118,7 +135,7 @@ private[ui] class StageTable(stages: Seq[StageInfo], 
parent: JobProgressUI) {
           </a>
         </td>
       }}
-      <td>{description}</td>
+      <td>{makeDescription(s)}</td>
       <td valign="middle">{submissionTime}</td>
       <td 
sorttable_customkey={duration.getOrElse(-1).toString}>{formattedDuration}</td>
       <td class="progress-cell">

http://git-wip-us.apache.org/repos/asf/spark/blob/2c557837/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index a74724d..db4df1d 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -290,7 +290,7 @@ class DAGSchedulerSuite extends FunSuite with 
BeforeAndAfter with LocalSparkCont
     val rdd = makeRdd(1, Nil)
     val jobId = submit(rdd, Array(0))
     cancel(jobId)
-    assert(failure.getMessage === s"Job $jobId cancelled")
+    assert(failure.getMessage === s"Job $jobId cancelled ")
     assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
     assert(sparkListener.failedStages.contains(0))
     assert(sparkListener.failedStages.size === 1)

http://git-wip-us.apache.org/repos/asf/spark/blob/2c557837/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index 9c60240..f3bfd03 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -191,6 +191,13 @@ Apart from these, the following properties are also 
available, and may be useful
   </td>
 </tr>
 <tr>
+  <td>spark.ui.killEnabled</td>
+  <td>true</td>
+  <td>
+    Allows stages and corresponding jobs to be killed from the web ui.
+  </td>
+</tr>
+<tr>
   <td>spark.shuffle.compress</td>
   <td>true</td>
   <td>

Reply via email to