xuanyuanking opened a new pull request #24892: [SPARK-25341][Core] Support rolling back a shuffle map stage and re-generate the shuffle files URL: https://github.com/apache/spark/pull/24892 After the newly added shuffle block fetching protocol in #24565, we can keep this work by extending the FetchShuffleBlocks message. Closes #24110. ## What changes were proposed in this pull request? This is a follow-up work for #22112's future improvment[1]: `Currently we can't rollback and rerun a shuffle map stage, and just fail.` Spark will rerun a finished shuffle write stage while meeting fetch failures, currently, the rerun shuffle map stage will only resubmit the task for missing partitions and reuse the output of other partitions. This logic is fine in most scenarios, but for indeterministic operations(like repartition), multiple shuffle write attempts may write different data, only rerun the missing partition will lead a correctness bug. So for the shuffle map stage of indeterministic operations, we need to support rolling back the shuffle map stage and re-generate the shuffle files. In this patch, we achieve this by adding the indeterministic tag in the stage, for the indeterministic stage, when the stage resubmits, we'll clear all existing map status and rerun all partitions. We also add properties in task LocalProperties to record the message for the retried indeterminate stage, so that the shuffle file which generated from retried indeterministic stage will keep attempt id in the file name, the corresponding reduce side will specify which attempt id of shuffle it wants to read. All changes are summarized as follows: - Extend FetchShuffleBlock message with shuffleGenerationId. - Add corresponding support for ShuffleBlockResolver, if the shuffle file generated from the indeterminate stage, its name will contain the indeterminateAttemptId, otherwise the file name just as before. - Add the retried indeterminate stage info in TaskContext.localProperties and use it in Shuffle Reader and Writer. - Add the determinate flag in Stage and use it in DAGScheduler and the cleaning work for the intermediate stage. ## How was this patch tested? - UT: Add UT for all changing code and newly added function. - Manual Test: Also providing a manual test to verify the effect. ``` import scala.sys.process._ import org.apache.spark.TaskContext val determinateStage0 = sc.parallelize(0 until 1000 * 1000 * 100, 10) val indeterminateStage1 = determinateStage0.repartition(200) val indeterminateStage2 = indeterminateStage1.repartition(200) val indeterminateStage3 = indeterminateStage2.repartition(100) val indeterminateStage4 = indeterminateStage3.repartition(300) val fetchFailIndeterminateStage4 = indeterminateStage4.map { x => if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId == 190 && TaskContext.get.stageAttemptNumber == 0) { throw new Exception("pkill -f -n java".!!) } x } val indeterminateStage5 = fetchFailIndeterminateStage4.repartition(200) val finalStage6 = indeterminateStage5.repartition(100).collect().distinct.length ``` It's a simple job with multi indeterminate stage, it will get a wrong answer while using old Spark version like 2.2/2.3, and will be killed after #22112. With this fix, the job can retry all indeterminate stage as below screenshot and get the right result. 
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
