[GitHub] spark pull request #22771: [SPARK-25773][Core]Cancel zombie tasks in a resul...

2018-11-05 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22771#discussion_r230730694
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1364,6 +1385,21 @@ private[spark] class DAGScheduler(
   if (job.numFinished == job.numPartitions) {
 markStageAsFinished(resultStage)
 cleanupStateForJobAndIndependentStages(job)
+try {
+  // killAllTaskAttempts will fail if a 
SchedulerBackend does not implement
+  // killTask.
+  logInfo(s"Job ${job.jobId} is finished. Cancelling 
potential speculative " +
+"or zombie tasks for this job")
+  // ResultStage is only used by this job. It's safe 
to kill speculative or
+  // zombie tasks in this stage.
+  taskScheduler.killAllTaskAttempts(
--- End diff --

cc @jiangxb1987 IIRC we have some similar code in barrier execution. Shall 
we create a util method to safely kill tasks?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22771: [SPARK-25773][Core]Cancel zombie tasks in a resul...

2018-10-30 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/22771


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22771: [SPARK-25773][Core]Cancel zombie tasks in a resul...

2018-10-25 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22771#discussion_r228409787
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1364,6 +1385,21 @@ private[spark] class DAGScheduler(
   if (job.numFinished == job.numPartitions) {
 markStageAsFinished(resultStage)
 cleanupStateForJobAndIndependentStages(job)
+try {
+  // killAllTaskAttempts will fail if a 
SchedulerBackend does not implement
+  // killTask.
+  logInfo(s"Job ${job.jobId} is finished. Killing 
potential speculative or " +
+s"zombie tasks for this job")
--- End diff --

I created https://issues.apache.org/jira/browse/SPARK-25849 to improve the 
document.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22771: [SPARK-25773][Core]Cancel zombie tasks in a resul...

2018-10-25 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/22771#discussion_r228371144
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1364,6 +1385,21 @@ private[spark] class DAGScheduler(
   if (job.numFinished == job.numPartitions) {
 markStageAsFinished(resultStage)
 cleanupStateForJobAndIndependentStages(job)
+try {
+  // killAllTaskAttempts will fail if a 
SchedulerBackend does not implement
+  // killTask.
+  logInfo(s"Job ${job.jobId} is finished. Killing 
potential speculative or " +
+s"zombie tasks for this job")
--- End diff --

Yes, other log messages probably also aren't very good. Maybe what we need 
is some additional explanation in the docs somewhere.

The issue that I am having is that if the log messages say that Tasks are 
being killed or canceled or whatever, many users will assume that that means 
that the Tasks will no longer be running on the Executors. In fact, what it 
means is that the DAGScheduler isn't going to try to run them anymore, and that 
any previously started Tasks may or may not still be running or continue to run 
on the Executors -- it depends on whether the Tasks are interruptible and on 
whether the interrupt on cancel configuration is set to true. The log messages 
make sense if you understand that subtlety, so we should probably try to 
explain it more fully in the docs.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22771: [SPARK-25773][Core]Cancel zombie tasks in a resul...

2018-10-25 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22771#discussion_r228355772
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1364,6 +1385,21 @@ private[spark] class DAGScheduler(
   if (job.numFinished == job.numPartitions) {
 markStageAsFinished(resultStage)
 cleanupStateForJobAndIndependentStages(job)
+try {
+  // killAllTaskAttempts will fail if a 
SchedulerBackend does not implement
+  // killTask.
+  logInfo(s"Job ${job.jobId} is finished. Killing 
potential speculative or " +
+s"zombie tasks for this job")
--- End diff --

How about `... is finished. Cancelling ...`? This should be consistent with 
other places.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22771: [SPARK-25773][Core]Cancel zombie tasks in a resul...

2018-10-25 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/22771#discussion_r228354020
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1364,6 +1385,21 @@ private[spark] class DAGScheduler(
   if (job.numFinished == job.numPartitions) {
 markStageAsFinished(resultStage)
 cleanupStateForJobAndIndependentStages(job)
+try {
+  // killAllTaskAttempts will fail if a 
SchedulerBackend does not implement
+  // killTask.
+  logInfo(s"Job ${job.jobId} is finished. Killing 
potential speculative or " +
+s"zombie tasks for this job")
--- End diff --

Isn't this misleading/confusing if !shouldInterruptTaskThread? You can 
attempt to kill speculative or zombie Tasks in that case, but nothing will 
actually happen if SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL is false.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22771: [SPARK-25773][Core]Cancel zombie tasks in a resul...

2018-10-23 Thread tgravescs
Github user tgravescs commented on a diff in the pull request:

https://github.com/apache/spark/pull/22771#discussion_r227503789
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1364,6 +1385,19 @@ private[spark] class DAGScheduler(
   if (job.numFinished == job.numPartitions) {
 markStageAsFinished(resultStage)
 cleanupStateForJobAndIndependentStages(job)
+try { // cancelTasks will fail if a SchedulerBackend 
does not implement killTask
--- End diff --

comment "cancelTasks" no longer valid


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22771: [SPARK-25773][Core]Cancel zombie tasks in a resul...

2018-10-23 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22771#discussion_r227459990
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1364,6 +1385,16 @@ private[spark] class DAGScheduler(
   if (job.numFinished == job.numPartitions) {
 markStageAsFinished(resultStage)
 cleanupStateForJobAndIndependentStages(job)
+try { // cancelTasks will fail if a SchedulerBackend 
does not implement killTask
+  logInfo(
+s"Job ${job.jobId} is finished. Killing 
speculative tasks for this job")
+  // ResultStage is only used by this job. It's safe 
to kill speculative or
+  // zombie tasks in this stage.
+  taskScheduler.cancelTasks(stageId, 
shouldInterruptTaskThread(job))
--- End diff --

IIRC `cancelTasks()` will fail the stage (maybe it's okay here coz the 
stage has been marked completed), if we just want to kill speculative/zombie 
tasks then maybe we shall call `killAllTaskAttempts()` ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22771: [SPARK-25773][Core]Cancel zombie tasks in a resul...

2018-10-22 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22771#discussion_r227060028
  
--- Diff: core/src/test/scala/org/apache/spark/SparkContextSuite.scala ---
@@ -672,6 +674,55 @@ class SparkContextSuite extends SparkFunSuite with 
LocalSparkContext with Eventu
   
assert(sc.statusTracker.getExecutorInfos.map(_.numRunningTasks()).sum == 0)
 }
   }
+
+  test("cancel zombie tasks in a result stage when the job finishes") {
+val conf = new SparkConf()
+  .setMaster("local-cluster[1,2,1024]")
+  .setAppName("test-cluster")
+  .set("spark.ui.enabled", "false")
+  // Disable this so that if a task is running, we can make sure the 
executor will always send
+  // task metrics via heartbeat to driver.
+  .set(EXECUTOR_HEARTBEAT_DROP_ZERO_ACCUMULATOR_UPDATES.key, "false")
+  // Set a short heartbeat interval to send 
SparkListenerExecutorMetricsUpdate fast
+  .set("spark.executor.heartbeatInterval", "1s")
+sc = new SparkContext(conf)
+sc.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "true")
+@volatile var runningTaskIds: Seq[Long] = null
+val listener = new SparkListener {
+  override def onExecutorMetricsUpdate(
+  executorMetricsUpdate: SparkListenerExecutorMetricsUpdate): Unit 
= {
+if (executorMetricsUpdate.execId != 
SparkContext.DRIVER_IDENTIFIER) {
+  runningTaskIds = executorMetricsUpdate.accumUpdates.map(_._1)
+}
+  }
+}
+sc.addSparkListener(listener)
+sc.range(0, 2).groupBy((x: Long) => x % 2, 2).map { case (x, _) =>
+  val context = org.apache.spark.TaskContext.get()
+  if (context.stageAttemptNumber == 0) {
+if (context.partitionId == 0) {
+  // Make the first task in the first stage attempt fail.
+  throw new 
FetchFailedException(SparkEnv.get.blockManager.blockManagerId, 0, 0, 0,
+new java.io.IOException("fake"))
+} else {
+  // Make the second task in the first stage attempt sleep to 
generate a zombie task
+  Thread.sleep(6)
+}
+  } else {
+// Make the second stage attempt successful.
+  }
+  x
+}.collect()
+sc.listenerBus.waitUntilEmpty(1)
+// As executors will send the metrics of running tasks via heartbeat, 
we can use this to check
+// whether there is any running task.
--- End diff --

I prefer this way to make sure the executor did receive the kill command 
and interrupt the tasks.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22771: [SPARK-25773][Core]Cancel zombie tasks in a resul...

2018-10-20 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/22771#discussion_r226847830
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1364,6 +1385,16 @@ private[spark] class DAGScheduler(
   if (job.numFinished == job.numPartitions) {
 markStageAsFinished(resultStage)
 cleanupStateForJobAndIndependentStages(job)
+try { // cancelTasks will fail if a SchedulerBackend 
does not implement killTask
+  logInfo(
+s"Job ${job.jobId} is finished. Killing 
speculative tasks for this job")
+  // ResultStage is only used by this job. It's safe 
to kill speculative or
+  // zombie tasks in this stage.
+  taskScheduler.cancelTasks(stageId, 
shouldInterruptTaskThread(job))
--- End diff --

Thanks, @zsxwing . This looks promising to reduce the flakiness in some 
test suite.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22771: [SPARK-25773][Core]Cancel zombie tasks in a resul...

2018-10-20 Thread felixcheung
Github user felixcheung commented on a diff in the pull request:

https://github.com/apache/spark/pull/22771#discussion_r226818782
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1364,6 +1385,16 @@ private[spark] class DAGScheduler(
   if (job.numFinished == job.numPartitions) {
 markStageAsFinished(resultStage)
 cleanupStateForJobAndIndependentStages(job)
+try { // cancelTasks will fail if a SchedulerBackend 
does not implement killTask
+  logInfo(
+s"Job ${job.jobId} is finished. Killing 
speculative tasks for this job")
+  // ResultStage is only used by this job. It's safe 
to kill speculative or
+  // zombie tasks in this stage.
+  taskScheduler.cancelTasks(stageId, 
shouldInterruptTaskThread(job))
+} catch {
+  case e: UnsupportedOperationException =>
+logInfo(s"Could not cancel tasks for stage 
$stageId", e)
--- End diff --

logWarn? aren't we leaking tasks?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22771: [SPARK-25773][Core]Cancel zombie tasks in a resul...

2018-10-19 Thread tgravescs
Github user tgravescs commented on a diff in the pull request:

https://github.com/apache/spark/pull/22771#discussion_r226651308
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1364,6 +1385,16 @@ private[spark] class DAGScheduler(
   if (job.numFinished == job.numPartitions) {
 markStageAsFinished(resultStage)
 cleanupStateForJobAndIndependentStages(job)
+try { // cancelTasks will fail if a SchedulerBackend 
does not implement killTask
+  logInfo(
+s"Job ${job.jobId} is finished. Killing 
speculative tasks for this job")
--- End diff --

message should be updated as this should be more then speculative tasks as 
it could be tasks in other attempts.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22771: [SPARK-25773][Core]Cancel zombie tasks in a resul...

2018-10-18 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/22771#discussion_r226522963
  
--- Diff: core/src/test/scala/org/apache/spark/SparkContextSuite.scala ---
@@ -672,6 +674,55 @@ class SparkContextSuite extends SparkFunSuite with 
LocalSparkContext with Eventu
   
assert(sc.statusTracker.getExecutorInfos.map(_.numRunningTasks()).sum == 0)
 }
   }
+
+  test("cancel zombie tasks in a result stage when the job finishes") {
+val conf = new SparkConf()
+  .setMaster("local-cluster[1,2,1024]")
+  .setAppName("test-cluster")
+  .set("spark.ui.enabled", "false")
+  // Disable this so that if a task is running, we can make sure the 
executor will always send
+  // task metrics via heartbeat to driver.
+  .set(EXECUTOR_HEARTBEAT_DROP_ZERO_ACCUMULATOR_UPDATES.key, "false")
+  // Set a short heartbeat interval to send 
SparkListenerExecutorMetricsUpdate fast
+  .set("spark.executor.heartbeatInterval", "1s")
+sc = new SparkContext(conf)
+sc.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "true")
+@volatile var runningTaskIds: Seq[Long] = null
+val listener = new SparkListener {
+  override def onExecutorMetricsUpdate(
+  executorMetricsUpdate: SparkListenerExecutorMetricsUpdate): Unit 
= {
+if (executorMetricsUpdate.execId != 
SparkContext.DRIVER_IDENTIFIER) {
+  runningTaskIds = executorMetricsUpdate.accumUpdates.map(_._1)
+}
+  }
+}
+sc.addSparkListener(listener)
+sc.range(0, 2).groupBy((x: Long) => x % 2, 2).map { case (x, _) =>
+  val context = org.apache.spark.TaskContext.get()
+  if (context.stageAttemptNumber == 0) {
+if (context.partitionId == 0) {
+  // Make the first task in the first stage attempt fail.
+  throw new 
FetchFailedException(SparkEnv.get.blockManager.blockManagerId, 0, 0, 0,
+new java.io.IOException("fake"))
+} else {
+  // Make the second task in the first stage attempt sleep to 
generate a zombie task
+  Thread.sleep(6)
+}
+  } else {
+// Make the second stage attempt successful.
+  }
+  x
+}.collect()
+sc.listenerBus.waitUntilEmpty(1)
+// As executors will send the metrics of running tasks via heartbeat, 
we can use this to check
+// whether there is any running task.
--- End diff --

any reason to do it this way, rather than using the TaskStart / TaskEnd 
events for a SparkListener?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22771: [SPARK-25773][Core]Cancel zombie tasks in a resul...

2018-10-18 Thread zsxwing
GitHub user zsxwing opened a pull request:

https://github.com/apache/spark/pull/22771

[SPARK-25773][Core]Cancel zombie tasks in a result stage when the job 
finishes

## What changes were proposed in this pull request?

When a job finishes, there may be some zombie tasks still running due to 
stage retry. Since a result stage will never be used by other jobs, running 
these tasks are just wasting the cluster resource. This PR just asks 
TaskScheduler to cancel the running tasks of a result stage when it's already 
finished. Credits go to @srinathshankar who suggested this idea to me.

This PR also fixes two minor issues while I'm touching DAGScheduler:
- Invalid spark.job.interruptOnCancel should not crash DAGScheduler.
- Non fatal errors should not crash DAGScheduler.

## How was this patch tested?

The new unit tests.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zsxwing/spark SPARK-25773

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22771.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22771


commit 581ea53b57cc9fc0e89f2d635422653cfdfcb27f
Author: Shixiong Zhu 
Date:   2018-10-16T22:07:04Z

Cancel zombie tasks in a result stage when the job finishes




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org