Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/22771#discussion_r226522963 --- Diff: core/src/test/scala/org/apache/spark/SparkContextSuite.scala --- @@ -672,6 +674,55 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu assert(sc.statusTracker.getExecutorInfos.map(_.numRunningTasks()).sum == 0) } } + + test("cancel zombie tasks in a result stage when the job finishes") { + val conf = new SparkConf() + .setMaster("local-cluster[1,2,1024]") + .setAppName("test-cluster") + .set("spark.ui.enabled", "false") + // Disable this so that if a task is running, we can make sure the executor will always send + // task metrics via heartbeat to driver. + .set(EXECUTOR_HEARTBEAT_DROP_ZERO_ACCUMULATOR_UPDATES.key, "false") + // Set a short heartbeat interval to send SparkListenerExecutorMetricsUpdate fast + .set("spark.executor.heartbeatInterval", "1s") + sc = new SparkContext(conf) + sc.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "true") + @volatile var runningTaskIds: Seq[Long] = null + val listener = new SparkListener { + override def onExecutorMetricsUpdate( + executorMetricsUpdate: SparkListenerExecutorMetricsUpdate): Unit = { + if (executorMetricsUpdate.execId != SparkContext.DRIVER_IDENTIFIER) { + runningTaskIds = executorMetricsUpdate.accumUpdates.map(_._1) + } + } + } + sc.addSparkListener(listener) + sc.range(0, 2).groupBy((x: Long) => x % 2, 2).map { case (x, _) => + val context = org.apache.spark.TaskContext.get() + if (context.stageAttemptNumber == 0) { + if (context.partitionId == 0) { + // Make the first task in the first stage attempt fail. + throw new FetchFailedException(SparkEnv.get.blockManager.blockManagerId, 0, 0, 0, + new java.io.IOException("fake")) + } else { + // Make the second task in the first stage attempt sleep to generate a zombie task + Thread.sleep(60000) + } + } else { + // Make the second stage attempt successful. + } + x + }.collect() + sc.listenerBus.waitUntilEmpty(10000) + // As executors will send the metrics of running tasks via heartbeat, we can use this to check + // whether there is any running task. --- End diff -- any reason to do it this way, rather than using the TaskStart / TaskEnd events for a SparkListener?
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org