Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/22112#discussion_r210967814 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1441,6 +1441,44 @@ class DAGScheduler( failedStages += failedStage failedStages += mapStage if (noResubmitEnqueued) { + // If the map stage is not idempotent(produces data in a different order when retry) + // and the shuffle partitioner is order sensitive, we have to retry all the tasks of + // the failed stage and its succeeding stages, because the input data of the failed + // stage will be changed after the map tasks are re-tried. + if (!mapStage.rdd.isIdempotent && mapStage.shuffleDep.orderSensitivePartitioner) { + def rollBackStage(stage: Stage): Unit = stage match { + case mapStage: ShuffleMapStage => + if (mapStage.findMissingPartitions().length < mapStage.numPartitions) { --- End diff -- This is making an assumption that all partitions of a stage are getting computed - which is not necessarily true in a general case (see numTasks vs numPartitions).
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org