cloud-fan commented on a change in pull request #24892: [SPARK-25341][Core] 
Support rolling back a shuffle map stage and re-generate the shuffle files
URL: https://github.com/apache/spark/pull/24892#discussion_r312920338
 
 

 ##########
 File path: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
 ##########
 @@ -2753,15 +2753,140 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with TimeLi
     }.head.findMissingPartitions() == Seq(0, 1))
 
     scheduler.resubmitFailedStages()
+    (shuffleId1, shuffleId2)
+  }
 
+  test("SPARK-25341: abort stage while using old fetch protocol") {
+    // reset the test context with using old fetch protocol
+    afterEach()
+    val conf = new SparkConf()
+    conf.set(config.SHUFFLE_USE_OLD_FETCH_PROTOCOL.key, "true")
+    init(conf)
+
+    val (shuffleId1, _) = constructIndeterminateStageRetryScenario()
     // The first task of the `shuffleMapRdd2` failed with fetch failure
+    runEvent(makeCompletionEvent(
+      taskSets(3).tasks(1),
+      Success,
+      makeMapStatus("hostC", 2)))
     runEvent(makeCompletionEvent(
       taskSets(3).tasks(0),
       FetchFailed(makeBlockManagerId("hostA"), shuffleId1, 0, 0, "ignored"),
       null))
 
-    // The job should fail because Spark can't rollback the shuffle map stage.
-    assert(failure != null && failure.getMessage.contains("Spark cannot 
rollback"))
+    // The job should fail because Spark can't rollback the shuffle map stage 
while
+    // using old protocol.
+    assert(failure != null && failure.getMessage.contains(
+      "Spark can only do this while using the new shuffle block fetching 
protocol"))
+  }
+
+  test("SPARK-25341: retry all the succeeding stages when the map stage is 
indeterminate") {
+    val (shuffleId1, shuffleId2) = constructIndeterminateStageRetryScenario()
+
+    // The first task of the `shuffleMapRdd2` failed with fetch failure
+    runEvent(makeCompletionEvent(
+      taskSets(3).tasks(0),
+      FetchFailed(makeBlockManagerId("hostA"), shuffleId1, 0, 0, "ignored"),
+      null))
+
+    val newFailedStages = scheduler.failedStages.toSeq
+    assert(newFailedStages.map(_.id) == Seq(0, 1))
+
+    scheduler.resubmitFailedStages()
+
+    // First shuffle map stage resubmitted and reran all tasks.
+    assert(taskSets(4).stageId == 0)
+    assert(taskSets(4).stageAttemptId == 1)
+    assert(taskSets(4).tasks.length == 2)
+
+    // Finish all stage.
+    complete(taskSets(4), Seq(
+      (Success, makeMapStatus("hostA", 2)),
+      (Success, makeMapStatus("hostB", 2))))
+    assert(mapOutputTracker.findMissingPartitions(shuffleId1) === 
Some(Seq.empty))
+    assert(taskSets(4).tasks.head.localProperties.getProperty(
+      SparkContext.SHUFFLE_GENERATION_ID_PREFIX + shuffleId1.toString) == "1")
+
+    complete(taskSets(5), Seq(
+      (Success, makeMapStatus("hostC", 2)),
+      (Success, makeMapStatus("hostD", 2))))
+    assert(mapOutputTracker.findMissingPartitions(shuffleId2) === 
Some(Seq.empty))
+    assert(taskSets(5).tasks.head.localProperties.getProperty(
+      SparkContext.SHUFFLE_GENERATION_ID_PREFIX + shuffleId2.toString) == "2")
+
+    complete(taskSets(6), Seq((Success, 11), (Success, 12)))
+
+    // Job successful ended.
+    assert(results === Map(0 -> 11, 1 -> 12))
+    results.clear()
+    assertDataStructuresEmpty()
+  }
+
+  test("SPARK-25341: continuous indeterminate stage roll back") {
+    // shuffleMapRdd1/2/3 are all indeterminate.
 
 Review comment:
   how about we make the `shuffleMapRdd3` deterministic? To prove that all 
parent stages need to rerun even if they are deterministic.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to