Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21653#discussion_r199550367
  
    --- Diff: 
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala ---
    @@ -1371,4 +1371,64 @@ class TaskSetManagerSuite extends SparkFunSuite with 
LocalSparkContext with Logg
         val valueSer = SparkEnv.get.serializer.newInstance()
         new DirectTaskResult[Int](valueSer.serialize(id), accumUpdates)
       }
    +
    +  test("SPARK-13343 speculative tasks that didn't commit shouldn't be 
marked as success") {
    +    sc = new SparkContext("local", "test")
    +    sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", 
"host2"))
    +    val taskSet = FakeTask.createTaskSet(4)
    +    // Set the speculation multiplier to be 0 so speculative tasks are 
launched immediately
    +    sc.conf.set("spark.speculation.multiplier", "0.0")
    +    sc.conf.set("spark.speculation", "true")
    +    val clock = new ManualClock()
    +    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, 
clock = clock)
    +    val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = 
taskSet.tasks.map { task =>
    +      task.metrics.internalAccums
    +    }
    +
    +    // Offer resources for 4 tasks to start
    +    for ((k, v) <- List(
    +      "exec1" -> "host1",
    +      "exec1" -> "host1",
    +      "exec2" -> "host2",
    +      "exec2" -> "host2")) {
    +      val taskOption = manager.resourceOffer(k, v, NO_PREF)
    +      assert(taskOption.isDefined)
    +      val task = taskOption.get
    +      assert(task.executorId === k)
    +    }
    +    assert(sched.startedTasks.toSet === Set(0, 1, 2, 3))
    +    clock.advance(1)
    +    // Complete the 3 tasks and leave 1 task in running
    +    for (id <- Set(0, 1, 2)) {
    +      manager.handleSuccessfulTask(id, createTaskResult(id, 
accumUpdatesByTask(id)))
    +      assert(sched.endedTasks(id) === Success)
    +    }
    +
    +    // checkSpeculatableTasks checks that the task runtime is greater than 
the threshold for
    +    // speculating. Since we use a threshold of 0 for speculation, tasks 
need to be running for
    +    // > 0ms, so advance the clock by 1ms here.
    +    clock.advance(1)
    +    assert(manager.checkSpeculatableTasks(0))
    +    assert(sched.speculativeTasks.toSet === Set(3))
    +
    +    // Offer resource to start the speculative attempt for the running task
    +    val taskOption5 = manager.resourceOffer("exec1", "host1", NO_PREF)
    +    assert(taskOption5.isDefined)
    +    val task5 = taskOption5.get
    +    assert(task5.index === 3)
    +    assert(task5.taskId === 4)
    +    assert(task5.executorId === "exec1")
    +    assert(task5.attemptNumber === 1)
    +    sched.backend = mock(classOf[SchedulerBackend])
    +
    +    // Complete one attempt for the running task
    +    manager.handleSuccessfulTask(3, createTaskResult(3, 
accumUpdatesByTask(3)))
    +    // Verify that it kills other running attempt
    +    verify(sched.backend).killTask(4, "exec1", true, "another attempt 
succeeded")
    +    // Complete another attempt for the running task
    --- End diff --
    
    can you expand this comment to explain why you're doing this?  without 
looking at the bug, it's easy to think this part is wrong, but in fact its the 
most important part of your test.  eg:
    
    There is a race between the scheduler asking to kill the other task, and 
that task actually finishing.  We simulate what happens if the other task 
finishes before we kill it.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to