Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22771#discussion_r227060028
  
    --- Diff: core/src/test/scala/org/apache/spark/SparkContextSuite.scala ---
    @@ -672,6 +674,55 @@ class SparkContextSuite extends SparkFunSuite with 
LocalSparkContext with Eventu
           
assert(sc.statusTracker.getExecutorInfos.map(_.numRunningTasks()).sum == 0)
         }
       }
    +
    +  test("cancel zombie tasks in a result stage when the job finishes") {
    +    val conf = new SparkConf()
    +      .setMaster("local-cluster[1,2,1024]")
    +      .setAppName("test-cluster")
    +      .set("spark.ui.enabled", "false")
    +      // Disable this so that if a task is running, we can make sure the 
executor will always send
    +      // task metrics via heartbeat to driver.
    +      .set(EXECUTOR_HEARTBEAT_DROP_ZERO_ACCUMULATOR_UPDATES.key, "false")
    +      // Set a short heartbeat interval to send 
SparkListenerExecutorMetricsUpdate fast
    +      .set("spark.executor.heartbeatInterval", "1s")
    +    sc = new SparkContext(conf)
    +    sc.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "true")
    +    @volatile var runningTaskIds: Seq[Long] = null
    +    val listener = new SparkListener {
    +      override def onExecutorMetricsUpdate(
    +          executorMetricsUpdate: SparkListenerExecutorMetricsUpdate): Unit 
= {
    +        if (executorMetricsUpdate.execId != 
SparkContext.DRIVER_IDENTIFIER) {
    +          runningTaskIds = executorMetricsUpdate.accumUpdates.map(_._1)
    +        }
    +      }
    +    }
    +    sc.addSparkListener(listener)
    +    sc.range(0, 2).groupBy((x: Long) => x % 2, 2).map { case (x, _) =>
    +      val context = org.apache.spark.TaskContext.get()
    +      if (context.stageAttemptNumber == 0) {
    +        if (context.partitionId == 0) {
    +          // Make the first task in the first stage attempt fail.
    +          throw new 
FetchFailedException(SparkEnv.get.blockManager.blockManagerId, 0, 0, 0,
    +            new java.io.IOException("fake"))
    +        } else {
    +          // Make the second task in the first stage attempt sleep to 
generate a zombie task
    +          Thread.sleep(60000)
    +        }
    +      } else {
    +        // Make the second stage attempt successful.
    +      }
    +      x
    +    }.collect()
    +    sc.listenerBus.waitUntilEmpty(10000)
    +    // As executors will send the metrics of running tasks via heartbeat, 
we can use this to check
    +    // whether there is any running task.
    --- End diff --
    
    I prefer this way to make sure the executor did receive the kill command 
and interrupt the tasks.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to