Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21653#discussion_r199550557
  
    --- Diff: 
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala ---
    @@ -1371,4 +1371,64 @@ class TaskSetManagerSuite extends SparkFunSuite with 
LocalSparkContext with Logg
         val valueSer = SparkEnv.get.serializer.newInstance()
         new DirectTaskResult[Int](valueSer.serialize(id), accumUpdates)
       }
    +
    +  test("SPARK-13343 speculative tasks that didn't commit shouldn't be 
marked as success") {
    +    sc = new SparkContext("local", "test")
    +    sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", 
"host2"))
    +    val taskSet = FakeTask.createTaskSet(4)
    +    // Set the speculation multiplier to be 0 so speculative tasks are 
launched immediately
    +    sc.conf.set("spark.speculation.multiplier", "0.0")
    +    sc.conf.set("spark.speculation", "true")
    +    val clock = new ManualClock()
    +    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, 
clock = clock)
    +    val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = 
taskSet.tasks.map { task =>
    +      task.metrics.internalAccums
    +    }
    +
    +    // Offer resources for 4 tasks to start
    +    for ((k, v) <- List(
    +      "exec1" -> "host1",
    +      "exec1" -> "host1",
    +      "exec2" -> "host2",
    +      "exec2" -> "host2")) {
    +      val taskOption = manager.resourceOffer(k, v, NO_PREF)
    +      assert(taskOption.isDefined)
    +      val task = taskOption.get
    +      assert(task.executorId === k)
    +    }
    +    assert(sched.startedTasks.toSet === Set(0, 1, 2, 3))
    +    clock.advance(1)
    +    // Complete the 3 tasks and leave 1 task in running
    +    for (id <- Set(0, 1, 2)) {
    +      manager.handleSuccessfulTask(id, createTaskResult(id, 
accumUpdatesByTask(id)))
    +      assert(sched.endedTasks(id) === Success)
    +    }
    +
    +    // checkSpeculatableTasks checks that the task runtime is greater than 
the threshold for
    +    // speculating. Since we use a threshold of 0 for speculation, tasks 
need to be running for
    +    // > 0ms, so advance the clock by 1ms here.
    +    clock.advance(1)
    +    assert(manager.checkSpeculatableTasks(0))
    +    assert(sched.speculativeTasks.toSet === Set(3))
    +
    +    // Offer resource to start the speculative attempt for the running task
    +    val taskOption5 = manager.resourceOffer("exec1", "host1", NO_PREF)
    +    assert(taskOption5.isDefined)
    +    val task5 = taskOption5.get
    +    assert(task5.index === 3)
    +    assert(task5.taskId === 4)
    +    assert(task5.executorId === "exec1")
    +    assert(task5.attemptNumber === 1)
    +    sched.backend = mock(classOf[SchedulerBackend])
    +
    +    // Complete one attempt for the running task
    +    manager.handleSuccessfulTask(3, createTaskResult(3, 
accumUpdatesByTask(3)))
    +    // Verify that it kills other running attempt
    +    verify(sched.backend).killTask(4, "exec1", true, "another attempt 
succeeded")
    +    // Complete another attempt for the running task
    +    manager.handleSuccessfulTask(4, createTaskResult(3, 
accumUpdatesByTask(3)))
    +
    +    assert(manager.taskInfos(3).successful == true)
    +    assert(manager.taskInfos(4).killed == true)
    --- End diff --
    
    it seems the main thing you're trying to change here is what gets passed to 
`DAGScheduler.taskEnded`, so shouldn't you be verifying that here?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to