Github user squito commented on a diff in the pull request:
https://github.com/apache/spark/pull/20998#discussion_r180439559
--- Diff:
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala ---
@@ -880,6 +880,59 @@ class TaskSetManagerSuite extends SparkFunSuite with
LocalSparkContext with Logg
assert(manager.resourceOffer("execB", "host2", ANY).get.index === 3)
}
+ test("speculative task should not run on a given host where another
attempt " +
+ "is already running on") {
+ sc = new SparkContext("local", "test")
+ sched = new FakeTaskScheduler(
+ sc, ("execA", "host1"), ("execB", "host2"))
+ val taskSet = FakeTask.createTaskSet(1,
+ Seq(TaskLocation("host1", "execA"), TaskLocation("host2", "execB")))
+ val clock = new ManualClock
+ val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES,
clock = clock)
+
+ // let task0.0 run on host1
+ assert(manager.resourceOffer("execA", "host1",
PROCESS_LOCAL).get.index == 0)
+ val info1 = manager.taskAttempts(0)(0)
+ assert(info1.running === true)
+ assert(info1.host === "host1")
+
+ // long time elapse, and task0.0 is still running,
+ // so we launch a speculative task0.1 on host2
+ clock.advance(1000)
+ manager.speculatableTasks += 0
+ assert(manager.resourceOffer("execB", "host2",
PROCESS_LOCAL).get.index === 0)
+ val info2 = manager.taskAttempts(0)(0)
+ assert(info2.running === true)
+ assert(info2.host === "host2")
+ assert(manager.speculatableTasks.size === 0)
+
+ // now, task0 has two copies running on host1, host2 separately,
+ // so we can not launch a speculative task on any hosts.
+ manager.speculatableTasks += 0
+ assert(manager.resourceOffer("execA", "host1", PROCESS_LOCAL) === None)
+ assert(manager.resourceOffer("execB", "host2", PROCESS_LOCAL) === None)
+ assert(manager.speculatableTasks.size === 1)
+
+ // after a long long time, task0.0 failed, and task0.0 can not re-run
since
+ // there's already a running copy.
+ clock.advance(1000)
+ info1.finishTime = clock.getTimeMillis()
+ assert(info1.running === false)
--- End diff --
`assert(!info1.running)`
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]