xuanyuanking opened a new pull request #24892: [SPARK-25341][Core] Support 
rolling back a shuffle map stage and re-generate the shuffle files
URL: https://github.com/apache/spark/pull/24892
 
 
   After the newly added shuffle block fetching protocol in #24565, we can keep 
this work by extending the FetchShuffleBlocks message. Closes #24110.
   
   ## What changes were proposed in this pull request?
   
   This is a follow-up work for #22112's future improvment[1]: `Currently we 
can't rollback and rerun a shuffle map stage, and just fail.`
   
   Spark will rerun a finished shuffle write stage while meeting fetch 
failures, currently, the rerun shuffle map stage will only resubmit the task 
for missing partitions and reuse the output of other partitions. This logic is 
fine in most scenarios, but for indeterministic operations(like repartition), 
multiple shuffle write attempts may write different data, only rerun the 
missing partition will lead a correctness bug. So for the shuffle map stage of 
indeterministic operations, we need to support rolling back the shuffle map 
stage and re-generate the shuffle files. In this patch, we achieve this by 
adding the indeterministic tag in the stage, for the indeterministic stage, 
when the stage resubmits, we'll clear all existing map status and rerun all 
partitions. We also add properties in task LocalProperties to record the 
message for the retried indeterminate stage, so that the shuffle file which 
generated from retried indeterministic stage will keep attempt id in the file 
name, the corresponding reduce side will specify which attempt id of shuffle it 
wants to read.
   
   All changes are summarized as follows:
   - Extend FetchShuffleBlock message with shuffleGenerationId.
   - Add corresponding support for ShuffleBlockResolver, if the shuffle file 
generated from the indeterminate stage, its name will contain the 
indeterminateAttemptId, otherwise the file name just as before.
   - Add the retried indeterminate stage info in TaskContext.localProperties 
and use it in Shuffle Reader and Writer.
   - Add the determinate flag in Stage and use it in DAGScheduler and the 
cleaning work for the intermediate stage.
   
   ## How was this patch tested?
   
   - UT: Add UT for all changing code and newly added function.
   - Manual Test: Also providing a manual test to verify the effect.
   ```
   import scala.sys.process._
   import org.apache.spark.TaskContext
   
   val determinateStage0 = sc.parallelize(0 until 1000 * 1000 * 100, 10)
   val indeterminateStage1 = determinateStage0.repartition(200)
   val indeterminateStage2 = indeterminateStage1.repartition(200)
   val indeterminateStage3 = indeterminateStage2.repartition(100)
   val indeterminateStage4 = indeterminateStage3.repartition(300)
   val fetchFailIndeterminateStage4 = indeterminateStage4.map { x =>
   if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId == 190 
&& 
     TaskContext.get.stageAttemptNumber == 0) {
     throw new Exception("pkill -f -n java".!!)
     }
     x
   }
   val indeterminateStage5 = fetchFailIndeterminateStage4.repartition(200)
   val finalStage6 = 
indeterminateStage5.repartition(100).collect().distinct.length
   ``` 
   It's a simple job with multi indeterminate stage, it will get a wrong answer 
while using old Spark version like 2.2/2.3, and will be killed after #22112. 
With this fix, the job can retry all indeterminate stage as below screenshot 
and get the right result.
   
![image](https://user-images.githubusercontent.com/4833765/54444941-c058b080-477e-11e9-8bad-bea13ed21b9e.png)
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to