Github user squito commented on the pull request:
https://github.com/apache/spark/pull/4055#issuecomment-118944562
@suyanNone I agree that right now, that is the way tasks & attempts are
tracked through the various `Task*` classes. But, my concern is that it could
easily lead to very confusing bugs in the future. The DAGScheduler certainly
needs to know about multiple tasks with the same stageId and partitionId. the
current code can keep track of those tacks even if they happen to have the same
`hashCode` and `equals`, but I'm still uneasy putting that change in.
Fundamentally, two tasks for the same stage and partition, but different
attempts, are *not* the same task. so its weird for `hashCode` and `equals` to
act as if they are the same.
OTOH, you might say that for `stage.pendingTasks` in particular, the only
thing that matters is the partitionId, which would seem reasonable to me. But
then I guess I would rename it and change it to `pendingPartitions:
HashSet[Int]`. (option 1)
You brought up a good point about just changing to `isAvailable`. I don't
think we can remove the `shuffleStage.pendingTasks.isEmpty` check, b/c that is
used elsewhere -- specifically, ["run trivial shuffle with out-of-band failure
and
retry"](https://github.com/apache/spark/blob/39e4e7e4d89077a637c4cad3a986e0e3447d1ae7/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala#L655)
fails if we only check `isAvailable`. The reason is because
`stage.pendingTasks.isEmpty` is also used to know when to call
`markStageAsFinished`.
As you've noted, the `DAGScheduler` is a very confusing piece of code, its
hard to reason about how things are interconnected. I wonder if we could
change the condition to:
```
if (runningStages.contains(shuffleStage) &&
(shuffleStage.pendingTasks.isEmpty || shuffleStage.isAvailable)) {
```
That would lead to executing the body of that block multiple times, but I
think that should be safe. There shouldn't be a problem in calling
`markStageAsFinished` if the stage was already finished, nor is there any harm
in re-registering the map outputs. (option 2)
Finally, we could add a separate clause just to call `markStageAsFinished`
when `shuffleStage.pendingTasks.isEmpty`. (option 3)
I just played around with it a little bit, it seems that all 3 options lead
to all the tests in `DAGSchedulerSuite` passing. But it still doesn't make me
super confident in any of them :P. Let me think on this a little more and then
lets try to get more reviewers?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]