This is an automated email from the ASF dual-hosted git repository. tgraves pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new ad238a2 [SPARK-29976][CORE] Trigger speculation for stages with too few tasks ad238a2 is described below commit ad238a2238a9d0da89be4424574436cbfaee579d Author: Yuchen Huo <yuchen....@databricks.com> AuthorDate: Tue Dec 10 14:43:26 2019 -0600 [SPARK-29976][CORE] Trigger speculation for stages with too few tasks ### What changes were proposed in this pull request? This PR add an optional spark conf for speculation to allow speculative runs for stages where there are only a few tasks. ``` spark.speculation.task.duration.threshold ``` If provided, tasks would be speculatively run if the TaskSet contains less tasks than the number of slots on a single executor and the task is taking longer time than the threshold. ### Why are the changes needed? This change helps avoid scenarios where there is single executor that could hang forever due to disk issue and we unfortunately assigned the single task in a TaskSet to that executor and cause the whole job to hang forever. ### Does this PR introduce any user-facing change? yes. If the new config `spark.speculation.task.duration.threshold` is provided and the TaskSet contains less tasks than the number of slots on a single executor and the task is taking longer time than the threshold, then speculative tasks would be submitted for the running tasks in the TaskSet. ### How was this patch tested? Unit tests are added to TaskSetManagerSuite. Closes #26614 from yuchenhuo/SPARK-29976. Authored-by: Yuchen Huo <yuchen....@databricks.com> Signed-off-by: Thomas Graves <tgra...@apache.org> --- .../org/apache/spark/internal/config/package.scala | 12 +++ .../apache/spark/scheduler/TaskSetManager.scala | 60 +++++++++---- .../spark/scheduler/TaskSetManagerSuite.scala | 98 ++++++++++++++++++++++ docs/configuration.md | 13 +++ 4 files changed, 167 insertions(+), 16 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 25dc4c6..9d7b31a 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -1499,6 +1499,18 @@ package object config { .doubleConf .createWithDefault(0.75) + private[spark] val SPECULATION_TASK_DURATION_THRESHOLD = + ConfigBuilder("spark.speculation.task.duration.threshold") + .doc("Task duration after which scheduler would try to speculative run the task. If " + + "provided, tasks would be speculatively run if current stage contains less tasks " + + "than or equal to the number of slots on a single executor and the task is taking " + + "longer time than the threshold. This config helps speculate stage with very few " + + "tasks. Regular speculation configs may also apply if the executor slots are " + + "large enough. E.g. tasks might be re-launched if there are enough successful runs " + + "even though the threshold hasn't been reached.") + .timeConf(TimeUnit.MILLISECONDS) + .createOptional + private[spark] val STAGING_DIR = ConfigBuilder("spark.yarn.stagingDir") .doc("Staging directory used while submitting applications.") .stringConf diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 5c0bc49..e026e90 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -81,6 +81,13 @@ private[spark] class TaskSetManager( val speculationQuantile = conf.get(SPECULATION_QUANTILE) val speculationMultiplier = conf.get(SPECULATION_MULTIPLIER) val minFinishedForSpeculation = math.max((speculationQuantile * numTasks).floor.toInt, 1) + // User provided threshold for speculation regardless of whether the quantile has been reached + val speculationTaskDurationThresOpt = conf.get(SPECULATION_TASK_DURATION_THRESHOLD) + // SPARK-29976: Only when the total number of tasks in the stage is less than or equal to the + // number of slots on a single executor, would the task manager speculative run the tasks if + // their duration is longer than the given threshold. In this way, we wouldn't speculate too + // aggressively but still handle basic cases. + val speculationTasksLessEqToSlots = numTasks <= (conf.get(EXECUTOR_CORES) / sched.CPUS_PER_TASK) // For each task, tracks whether a copy of the task has succeeded. A task will also be // marked as "succeeded" if it failed with a fetch failure, in which case it should not @@ -958,14 +965,40 @@ private[spark] class TaskSetManager( } /** + * Check if the task associated with the given tid has past the time threshold and should be + * speculative run. + */ + private def checkAndSubmitSpeculatableTask( + tid: Long, + currentTimeMillis: Long, + threshold: Double): Boolean = { + val info = taskInfos(tid) + val index = info.index + if (!successful(index) && copiesRunning(index) == 1 && + info.timeRunning(currentTimeMillis) > threshold && !speculatableTasks.contains(index)) { + addPendingTask(index, speculatable = true) + logInfo( + ("Marking task %d in stage %s (on %s) as speculatable because it ran more" + + " than %.0f ms(%d speculatable tasks in this taskset now)") + .format(index, taskSet.id, info.host, threshold, speculatableTasks.size + 1)) + speculatableTasks += index + sched.dagScheduler.speculativeTaskSubmitted(tasks(index)) + true + } else { + false + } + } + + /** * Check for tasks to be speculated and return true if there are any. This is called periodically * by the TaskScheduler. * */ override def checkSpeculatableTasks(minTimeToSpeculation: Int): Boolean = { - // Can't speculate if we only have one task, and no need to speculate if the task set is a - // zombie or is from a barrier stage. - if (isZombie || isBarrier || numTasks == 1) { + // No need to speculate if the task set is zombie or is from a barrier stage. If there is only + // one task we don't speculate since we don't have metrics to decide whether it's taking too + // long or not, unless a task duration threshold is explicitly provided. + if (isZombie || isBarrier || (numTasks == 1 && !speculationTaskDurationThresOpt.isDefined)) { return false } var foundTasks = false @@ -983,19 +1016,14 @@ private[spark] class TaskSetManager( // bound based on that. logDebug("Task length threshold for speculation: " + threshold) for (tid <- runningTasksSet) { - val info = taskInfos(tid) - val index = info.index - if (!successful(index) && copiesRunning(index) == 1 && info.timeRunning(time) > threshold && - !speculatableTasks.contains(index)) { - addPendingTask(index, speculatable = true) - logInfo( - ("Marking task %d in stage %s (on %s) as speculatable because it ran more" + - " than %.0f ms(%d speculatable tasks in this taskset now)") - .format(index, taskSet.id, info.host, threshold, speculatableTasks.size + 1)) - speculatableTasks += index - sched.dagScheduler.speculativeTaskSubmitted(tasks(index)) - foundTasks = true - } + foundTasks |= checkAndSubmitSpeculatableTask(tid, time, threshold) + } + } else if (speculationTaskDurationThresOpt.isDefined && speculationTasksLessEqToSlots) { + val time = clock.getTimeMillis() + val threshold = speculationTaskDurationThresOpt.get + logDebug(s"Tasks taking longer time than provided speculation threshold: $threshold") + for (tid <- runningTasksSet) { + foundTasks |= checkAndSubmitSpeculatableTask(tid, time, threshold) } } foundTasks diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index 34bcae8..1d64832 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -1778,6 +1778,104 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg assert(manager.resourceOffer("exec1", "host1", ANY).isEmpty) } + private def testSpeculationDurationSetup( + speculationThresholdOpt: Option[String], + speculationQuantile: Double, + numTasks: Int, + numSlots: Int): (TaskSetManager, ManualClock) = { + sc = new SparkContext("local", "test") + sc.conf.set(config.SPECULATION_ENABLED, true) + sc.conf.set(config.SPECULATION_QUANTILE.key, speculationQuantile.toString) + // Set the number of slots per executor + sc.conf.set(config.EXECUTOR_CORES.key, numSlots.toString) + sc.conf.set(config.CPUS_PER_TASK.key, "1") + if (speculationThresholdOpt.isDefined) { + sc.conf.set(config.SPECULATION_TASK_DURATION_THRESHOLD.key, speculationThresholdOpt.get) + } + sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2")) + // Create a task set with the given number of tasks + val taskSet = FakeTask.createTaskSet(numTasks) + val clock = new ManualClock() + val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock) + manager.isZombie = false + + // Offer resources for the task to start + for (i <- 1 to numTasks) { + manager.resourceOffer(s"exec$i", s"host$i", NO_PREF) + } + (manager, clock) + } + + private def testSpeculationDurationThreshold( + speculationThresholdProvided: Boolean, + numTasks: Int, + numSlots: Int): Unit = { + val (manager, clock) = testSpeculationDurationSetup( + // Set the threshold to be 60 minutes + if (speculationThresholdProvided) Some("60min") else None, + // Set the quantile to be 1.0 so that regular speculation would not be triggered + 1.0, + numTasks, + numSlots + ) + + // if the time threshold has not been exceeded, no speculative run should be triggered + clock.advance(1000*60*60) + assert(!manager.checkSpeculatableTasks(0)) + assert(sched.speculativeTasks.size == 0) + + // Now the task should have been running for 60 minutes and 1 second + clock.advance(1) + if (speculationThresholdProvided && numSlots >= numTasks) { + assert(manager.checkSpeculatableTasks(0)) + assert(sched.speculativeTasks.size == numTasks) + // Should not submit duplicated tasks + assert(!manager.checkSpeculatableTasks(0)) + assert(sched.speculativeTasks.size == numTasks) + } else { + // If the feature flag is turned off, or the stage contains too many tasks + assert(!manager.checkSpeculatableTasks(0)) + assert(sched.speculativeTasks.size == 0) + } + } + + Seq(1, 2).foreach { numTasks => + test("SPARK-29976 when a speculation time threshold is provided, should speculative " + + s"run the task even if there are not enough successful runs, total tasks: $numTasks") { + testSpeculationDurationThreshold(true, numTasks, numTasks) + } + + test("SPARK-29976: when the speculation time threshold is not provided," + + s"don't speculative run if there are not enough successful runs, total tasks: $numTasks") { + testSpeculationDurationThreshold(false, numTasks, numTasks) + } + } + + test("SPARK-29976 when a speculation time threshold is provided, should not speculative " + + "if there are too many tasks in the stage even though time threshold is provided") { + testSpeculationDurationThreshold(true, 2, 1) + } + + test("SPARK-29976 Regular speculation configs should still take effect even when a " + + "threshold is provided") { + val (manager, clock) = testSpeculationDurationSetup( + Some("60min"), + speculationQuantile = 0.5, + numTasks = 2, + numSlots = 2 + ) + + // Task duration can't be 0, advance 1 sec + clock.advance(1000) + // Mark one of the task succeeded, which should satisfy the quantile + manager.handleSuccessfulTask(0, createTaskResult(0)) + // Advance 1 more second so the remaining task takes longer than medium but doesn't satisfy the + // duration threshold yet + clock.advance(1000) + assert(manager.checkSpeculatableTasks(0)) + assert(sched.speculativeTasks.size == 1) + } + test("TaskOutputFileAlreadyExistException lead to task set abortion") { sc = new SparkContext("local", "test") sched = new FakeTaskScheduler(sc, ("exec1", "host1")) diff --git a/docs/configuration.md b/docs/configuration.md index 8cd285c..9375896 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -2034,6 +2034,19 @@ Apart from these, the following properties are also available, and may be useful </td> </tr> <tr> + <td><code>spark.speculation.task.duration.threshold</code></td> + <td>None</td> + <td> + Task duration after which scheduler would try to speculative run the task. If provided, tasks + would be speculatively run if current stage contains less tasks than or equal to the number of + slots on a single executor and the task is taking longer time than the threshold. This config + helps speculate stage with very few tasks. Regular speculation configs may also apply if the + executor slots are large enough. E.g. tasks might be re-launched if there are enough successful + runs even though the threshold hasn't been reached. + Default unit is bytes, unless otherwise specified. + </td> +</tr> +<tr> <td><code>spark.task.cpus</code></td> <td>1</td> <td> --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org