Github user mridulm commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22112#discussion_r210967814
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
    @@ -1441,6 +1441,44 @@ class DAGScheduler(
                 failedStages += failedStage
                 failedStages += mapStage
                 if (noResubmitEnqueued) {
    +              // If the map stage is not idempotent(produces data in a 
different order when retry)
    +              // and the shuffle partitioner is order sensitive, we have 
to retry all the tasks of
    +              // the failed stage and its succeeding stages, because the 
input data of the failed
    +              // stage will be changed after the map tasks are re-tried.
    +              if (!mapStage.rdd.isIdempotent && 
mapStage.shuffleDep.orderSensitivePartitioner) {
    +                def rollBackStage(stage: Stage): Unit = stage match {
    +                  case mapStage: ShuffleMapStage =>
    +                    if (mapStage.findMissingPartitions().length < 
mapStage.numPartitions) {
    --- End diff --
    
    This is making an assumption that all partitions of a stage are getting 
computed - which is not necessarily true in a general case (see numTasks vs 
numPartitions).



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to