Github user squito commented on a diff in the pull request:
https://github.com/apache/spark/pull/21653#discussion_r199549772
--- Diff:
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala ---
@@ -1371,4 +1371,64 @@ class TaskSetManagerSuite extends SparkFunSuite with
LocalSparkContext with Logg
val valueSer = SparkEnv.get.serializer.newInstance()
new DirectTaskResult[Int](valueSer.serialize(id), accumUpdates)
}
+
+ test("SPARK-13343 speculative tasks that didn't commit shouldn't be
marked as success") {
+ sc = new SparkContext("local", "test")
+ sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2",
"host2"))
+ val taskSet = FakeTask.createTaskSet(4)
+ // Set the speculation multiplier to be 0 so speculative tasks are
launched immediately
+ sc.conf.set("spark.speculation.multiplier", "0.0")
+ sc.conf.set("spark.speculation", "true")
+ val clock = new ManualClock()
+ val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES,
clock = clock)
+ val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] =
taskSet.tasks.map { task =>
+ task.metrics.internalAccums
+ }
+
+ // Offer resources for 4 tasks to start
+ for ((k, v) <- List(
+ "exec1" -> "host1",
+ "exec1" -> "host1",
+ "exec2" -> "host2",
+ "exec2" -> "host2")) {
--- End diff --
nit: double indent the contents of the `List`
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]