Github user squito commented on the pull request:
https://github.com/apache/spark/pull/4055#issuecomment-114653145
@suyanNone I think the problem w/ the test case as-is, is that you are
still having multiple completion events for the exact same task, which we
doesn't really happen. I think the situation you are looking to recreate is a
little different -- after a fetch failure, what happens is the other tasks in
that task set will continue to run. (the task set is marked as a zombie, but
it doesn't stop the tasks from running). So you can end up with some tasks
finishing from the zombie stage attempt, at the same time as some tasks finish
from the new attempt, and that's how you can trigger the situation you are
describing.
Here's the test case I came up with:
```scala
test("run with ShuffleMapStage retry") {
val firstRDD = new MyRDD(sc, 3, Nil)
val firstShuffleDep = new ShuffleDependency(firstRDD, null)
val firstShuffleId = firstShuffleDep.shuffleId
val shuffleMapRdd = new MyRDD(sc, 3, List(firstShuffleDep))
val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
val reduceRdd = new MyRDD(sc, 1, List(shuffleDep))
submit(reduceRdd, Array(0))
// things start out smoothly, stage 0 completes with no issues
complete(taskSets(0), Seq(
(Success, makeMapStatus("hostB", shuffleMapRdd.partitions.size)),
(Success, makeMapStatus("hostB", shuffleMapRdd.partitions.size)),
(Success, makeMapStatus("hostA", shuffleMapRdd.partitions.size))
))
// then one executor dies, and a task fails in stage 1
runEvent(ExecutorLost("exec-hostA"))
runEvent(CompletionEvent(taskSets(1).tasks(0),
FetchFailed(null, firstShuffleId, 2, 0, "Fetch failed"),
null, null, createFakeTaskInfo(), null))
// so we resubmit stage 0, which completes happily
scheduler.resubmitFailedStages()
val stage0Resubmit = taskSets(2)
assert(stage0Resubmit.stageId == 0)
assert(stage0Resubmit.attempt === 1)
val task = stage0Resubmit.tasks(0)
assert(task.partitionId === 2)
runEvent(CompletionEvent(task, Success,
makeMapStatus("hostC", shuffleMapRdd.partitions.size), null,
createFakeTaskInfo(), null))
// now here is where things get tricky : we will now have a task set
representing
// the second attempt for stage 1, but we *also* have some tasks for
the first attempt for
// stage 1 still going
val stage1Resubmit = taskSets(3)
assert(stage1Resubmit.stageId == 1)
assert(stage1Resubmit.attempt === 1)
assert(stage1Resubmit.tasks.length === 3)
// we'll have some tasks finish from the first attempt, and some finish
from the second attempt,
// so that we actually have all stage outputs, though no attempt has
completed all its
// tasks
runEvent(CompletionEvent(taskSets(3).tasks(0), Success,
makeMapStatus("hostC", reduceRdd.partitions.size), null,
createFakeTaskInfo(), null))
runEvent(CompletionEvent(taskSets(3).tasks(1), Success,
makeMapStatus("hostC", reduceRdd.partitions.size), null,
createFakeTaskInfo(), null))
// late task finish from the first attempt
runEvent(CompletionEvent(taskSets(1).tasks(2), Success,
makeMapStatus("hostB", reduceRdd.partitions.size), null,
createFakeTaskInfo(), null))
// What should happen now is that we submit stage 2. However, we might
not see an error
// b/c of DAGScheduler's error handling (it tends to swallow errors and
just log them). But
// we can check some conditions.
// Note that the really important thing here is not so much that we
submit stage 2 *immediately*
// but that we don't end up with some error from these interleaved
completions. It would also
// be OK (though sub-optimal) if stage 2 simply waited until the
resubmission of stage 1 had
// all its tasks complete
// check that we have all the map output for stage 0 (it should have
been there even before
// the last round of completions from stage 1, but just to double check
it hasn't been messed
// up)
(0 until 3).foreach { reduceIdx =>
val arr = mapOutputTracker.getServerStatuses(0, reduceIdx)
assert(arr != null)
assert(arr.nonEmpty)
}
// and check we have all the map output for stage 1
(0 until 1).foreach { reduceIdx =>
val arr = mapOutputTracker.getServerStatuses(1,reduceIdx)
assert(arr != null)
assert(arr.nonEmpty)
}
// and check that stage 2 has been submitted
assert(taskSets.size == 5)
val stage2TaskSet = taskSets(4)
assert(stage2TaskSet.stageId == 2)
assert(stage2TaskSet.attempt == 0)
}
```
And sure enough, your changes to `Task`, by adding `equals` and `hashCode`
make that test case pass.
However, I do **not** think that is the right change. I don't like the
idea of tasks being considered equal just because they are the same stage &
partition -- you can have multiple tasks running concurrently for the same
stage & partition from different attempts, and it seems weird to consider them
equal.
A simple solution might be to just change `stage.pendingTasks` to key on
stage & partition, without changing `Task` for all uses. (eg., make
`stage.pendingTasks` a `Map[(Int,Int),Task]`.)
I think a better option would be to change the logic of when we register
the mapOutputs, to ignore `stage.pendingTasks` completely, and just use
`stage.isAvailable`. After all, the fundamental problem here is that
`DAGScheduler` is using two different metrics to decide when a stage is
complete, and in the scenario you've discovered, those two metrics differ.
Well, we should just use one metric in all places (as long as its the right
one).
Another general issue this points out is that the semantics of some of
these variables really aren't that clear ... eg. should `stage.pendingTasks`
keep track of tasks from "zombie" taskSets?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]