xuanyuanking commented on a change in pull request #24892: [SPARK-25341][Core] 
Support rolling back a shuffle map stage and re-generate the shuffle files
URL: https://github.com/apache/spark/pull/24892#discussion_r300167945
 
 

 ##########
 File path: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
 ##########
 @@ -2753,15 +2753,140 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with TimeLi
     }.head.findMissingPartitions() == Seq(0, 1))
 
     scheduler.resubmitFailedStages()
+    (shuffleId1, shuffleId2)
+  }
 
+  test("SPARK-25341: abort stage while using old fetch protocol") {
+    // reset the test context with using old fetch protocol
+    afterEach()
+    val conf = new SparkConf()
+    conf.set(config.SHUFFLE_USE_OLD_FETCH_PROTOCOL.key, "true")
+    init(conf)
+
+    val (shuffleId1, _) = constructIndeterminateStageRetryScenario()
     // The first task of the `shuffleMapRdd2` failed with fetch failure
+    runEvent(makeCompletionEvent(
+      taskSets(3).tasks(1),
+      Success,
+      makeMapStatus("hostC", 2)))
     runEvent(makeCompletionEvent(
       taskSets(3).tasks(0),
       FetchFailed(makeBlockManagerId("hostA"), shuffleId1, 0, 0, "ignored"),
       null))
 
-    // The job should fail because Spark can't rollback the shuffle map stage.
-    assert(failure != null && failure.getMessage.contains("Spark cannot 
rollback"))
+    // The job should fail because Spark can't rollback the shuffle map stage 
while
+    // using old protocol.
+    assert(failure != null && failure.getMessage.contains(
+      "Spark can only do this while using the new shuffle block fetching 
protocol"))
+  }
+
+  test("SPARK-25341: retry all the succeeding stages when the map stage is 
indeterminate") {
+    val (shuffleId1, shuffleId2) = constructIndeterminateStageRetryScenario()
+
+    // The first task of the `shuffleMapRdd2` failed with fetch failure
+    runEvent(makeCompletionEvent(
+      taskSets(3).tasks(0),
+      FetchFailed(makeBlockManagerId("hostA"), shuffleId1, 0, 0, "ignored"),
+      null))
+
+    val newFailedStages = scheduler.failedStages.toSeq
+    assert(newFailedStages.map(_.id) == Seq(0, 1))
+
+    scheduler.resubmitFailedStages()
+
+    // First shuffle map stage resubmitted and reran all tasks.
+    assert(taskSets(4).stageId == 0)
+    assert(taskSets(4).stageAttemptId == 1)
+    assert(taskSets(4).tasks.length == 2)
+
+    // Finish all stage.
+    complete(taskSets(4), Seq(
+      (Success, makeMapStatus("hostA", 2)),
+      (Success, makeMapStatus("hostB", 2))))
+    assert(mapOutputTracker.findMissingPartitions(shuffleId1) === 
Some(Seq.empty))
+    assert(taskSets(4).tasks.head.localProperties.getProperty(
+      SparkContext.SHUFFLE_GENERATION_ID_PREFIX + shuffleId1.toString) == "1")
+
+    complete(taskSets(5), Seq(
+      (Success, makeMapStatus("hostC", 2)),
+      (Success, makeMapStatus("hostD", 2))))
+    assert(mapOutputTracker.findMissingPartitions(shuffleId2) === 
Some(Seq.empty))
+    assert(taskSets(5).tasks.head.localProperties.getProperty(
+      SparkContext.SHUFFLE_GENERATION_ID_PREFIX + shuffleId2.toString) == "2")
+
+    complete(taskSets(6), Seq((Success, 11), (Success, 12)))
+
+    // Job successful ended.
+    assert(results === Map(0 -> 11, 1 -> 12))
+    results.clear()
+    assertDataStructuresEmpty()
+  }
+
+  test("SPARK-25341: continuous indeterminate stage roll back") {
+    // shuffleMapRdd1/2/3 are all indeterminate.
+    val shuffleMapRdd1 = new MyRDD(sc, 2, Nil, indeterminate = true)
+    val shuffleDep1 = new ShuffleDependency(shuffleMapRdd1, new 
HashPartitioner(2))
+    val shuffleId1 = shuffleDep1.shuffleId
+
+    val shuffleMapRdd2 = new MyRDD(
+      sc, 2, List(shuffleDep1), tracker = mapOutputTracker, indeterminate = 
true)
+    val shuffleDep2 = new ShuffleDependency(shuffleMapRdd2, new 
HashPartitioner(2))
+    val shuffleId2 = shuffleDep2.shuffleId
+
+    val shuffleMapRdd3 = new MyRDD(
+      sc, 2, List(shuffleDep2), tracker = mapOutputTracker, indeterminate = 
true)
+    val shuffleDep3 = new ShuffleDependency(shuffleMapRdd3, new 
HashPartitioner(2))
+    val shuffleId3 = shuffleDep3.shuffleId
+    val finalRdd = new MyRDD(sc, 2, List(shuffleDep3), tracker = 
mapOutputTracker)
+
+    submit(finalRdd, Array(0, 1), properties = new Properties())
+
+    // Finish the first 2 shuffle map stages.
+    complete(taskSets(0), Seq(
+      (Success, makeMapStatus("hostA", 2)),
+      (Success, makeMapStatus("hostB", 2))))
+    assert(mapOutputTracker.findMissingPartitions(shuffleId1) === 
Some(Seq.empty))
+
+    complete(taskSets(1), Seq(
+      (Success, makeMapStatus("hostB", 2)),
+      (Success, makeMapStatus("hostD", 2))))
+    assert(mapOutputTracker.findMissingPartitions(shuffleId2) === 
Some(Seq.empty))
+
+    // Executor lost on hostB, both of stage 0 and 1 should be reran.
+    runEvent(makeCompletionEvent(
+      taskSets(2).tasks(0),
+      FetchFailed(makeBlockManagerId("hostB"), shuffleId2, 0, 0, "ignored"),
+      null))
+    mapOutputTracker.removeOutputsOnHost("hostB")
+
+    assert(scheduler.failedStages.toSeq.map(_.id) == Seq(1, 2))
 
 Review comment:
   Because stage 0 succeeds, FetchFailed triggered the failed stage 1,2.
   In this test, I use a combo of FetchFailed and removeOutputsOnHost to 
simulate executor lost and check all indeterminate stage will rerun. The 
scenario I want to test is, although the indeterminate stage 0 is not failed, 
just missing an output, but when its child stage reran, stage 0 will also whole 
stage reran.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to