Github user squito commented on the pull request:

    https://github.com/apache/spark/pull/4055#issuecomment-118944562
  
    @suyanNone I agree that right now, that is the way tasks & attempts are 
tracked through the various `Task*` classes.  But, my concern is that it could 
easily lead to very confusing bugs in the future.  The DAGScheduler certainly 
needs to know about multiple tasks with the same stageId and partitionId.  the 
current code can keep track of those tacks even if they happen to have the same 
`hashCode` and `equals`, but I'm still uneasy putting that change in.  
Fundamentally, two tasks for the same stage and partition, but different 
attempts, are *not* the same task.  so its weird for `hashCode` and `equals` to 
act as if they are the same.
    
    OTOH, you might say that for `stage.pendingTasks` in particular, the only 
thing that matters is the partitionId, which would seem reasonable to me.  But 
then I guess I would rename it and change it to `pendingPartitions: 
HashSet[Int]`. (option 1)
    
    You brought up a good point about just changing to `isAvailable`.  I don't 
think we can remove the `shuffleStage.pendingTasks.isEmpty` check, b/c that is 
used elsewhere -- specifically, ["run trivial shuffle with out-of-band failure 
and 
retry"](https://github.com/apache/spark/blob/39e4e7e4d89077a637c4cad3a986e0e3447d1ae7/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala#L655)
 fails if we only check `isAvailable`.  The reason is because 
`stage.pendingTasks.isEmpty` is also used to know when to call 
`markStageAsFinished`.
    
    As you've noted, the `DAGScheduler` is a very confusing piece of code, its 
hard to reason about how things are interconnected.  I wonder if we could 
change the condition to:
    
    ```
    if (runningStages.contains(shuffleStage) && 
      (shuffleStage.pendingTasks.isEmpty || shuffleStage.isAvailable)) {
    ```
    
    That would lead to executing the body of that block multiple times, but I 
think that should be safe.  There shouldn't be a problem in calling 
`markStageAsFinished` if the stage was already finished, nor is there any harm 
in re-registering the map outputs. (option 2)
    
     Finally, we could add a separate clause just to call `markStageAsFinished` 
when `shuffleStage.pendingTasks.isEmpty`. (option 3)
    
    I just played around with it a little bit, it seems that all 3 options lead 
to all the tests in `DAGSchedulerSuite` passing.  But it still doesn't make me 
super confident in any of them :P.  Let me think on this a little more and then 
lets try to get more reviewers?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to