Github user kayousterhout commented on a diff in the pull request:
https://github.com/apache/spark/pull/16892#discussion_r102821009
--- Diff:
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
@@ -1569,24 +1569,44 @@ class DAGSchedulerSuite extends SparkFunSuite with
LocalSparkContext with Timeou
assertDataStructuresEmpty()
}
- test("run trivial shuffle with out-of-band failure and retry") {
+ /**
+ * In this test, we run a map stage where one of the executors fails but
we still receive a
+ * "zombie" complete message from a task that ran on that executor. We
want to make sure the
+ * stage is resubmitted so that the task that ran on the failed executor
is re-executed, and
+ * that the stage is only marked as finished once that task completes.
+ */
+ test("run trivial shuffle with out-of-band executor failure and retry") {
val shuffleMapRdd = new MyRDD(sc, 2, Nil)
val shuffleDep = new ShuffleDependency(shuffleMapRdd, new
HashPartitioner(2))
val shuffleId = shuffleDep.shuffleId
val reduceRdd = new MyRDD(sc, 1, List(shuffleDep), tracker =
mapOutputTracker)
submit(reduceRdd, Array(0))
- // blockManagerMaster.removeExecutor("exec-hostA")
- // pretend we were told hostA went away
+ // Tell the DAGScheduler that hostA was lost.
runEvent(ExecutorLost("exec-hostA", ExecutorKilled))
- // DAGScheduler will immediately resubmit the stage after it appears
to have no pending tasks
- // rather than marking it is as failed and waiting.
complete(taskSets(0), Seq(
(Success, makeMapStatus("hostA", 1)),
(Success, makeMapStatus("hostB", 1))))
+
+ // At this point, no more tasks are running for the stage (and the
TaskSetManager considers the
+ // stage complete), but the tasks that ran on HostA need to be re-run,
so the DAGScheduler
+ // should re-submit the stage.
+ assert(taskSets.size === 2)
+
+ // Make sure that the stage that was re-submitted was the
ShuffleMapStage (not the reduce
+ // stage, which shouldn't be run until all of the tasks in the
ShuffleMapStage complete on
+ // alive executors).
+ assert(taskSets(1).tasks(0).isInstanceOf[ShuffleMapTask])
--- End diff --
Good idea done
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]