Github user squito commented on a diff in the pull request:
https://github.com/apache/spark/pull/5636#discussion_r35392436
--- Diff:
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
@@ -475,7 +475,148 @@ class DAGSchedulerSuite
assert(results === Map(0 -> 42, 1 -> 43))
assertDataStructuresEmpty()
}
+
+ test("Multiple consecutive stage failures should lead to stage being
aborted.") {
+ // Create a new Listener to confirm that the listenerBus sees the
JobEnd message
+ // when we abort the stage. This message will also be consumed by the
EventLoggingListener
+ // so this will propagate up to the user.
+ var ended = false
+ var jobResult : JobResult = null
+ class EndListener extends SparkListener {
+ override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
+ jobResult = jobEnd.jobResult
+ ended = true
+ }
+ }
+
+ sc.listenerBus.addListener(new EndListener())
+
+ val shuffleMapRdd = new MyRDD(sc, 2, Nil)
+ val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
+ val shuffleId = shuffleDep.shuffleId
+ val reduceRdd = new MyRDD(sc, 2, List(shuffleDep))
+ submit(reduceRdd, Array(0, 1))
+
+ complete(taskSets(0), Seq(
+ (Success, makeMapStatus("hostA", 1)),
+ (Success, makeMapStatus("hostB", 1))))
+
+ for (x <- 1 to Stage.MAX_STAGE_FAILURES) {
+ // the 2nd ResultTask failed
+ complete(taskSets(1), Seq(
--- End diff --
a couple of things:
(a) your `makeMapStatus` calls when you complete `taskSets(0)` need to have
the second arg be `2`, since there are 2 reduce partitions. (Because of this,
in the current code the second stage never gets submitted.)
(b) while debugging in DagSchedulerSuite I sometimes find it helpful to
just print out the task sets, something like:
```scala
println(s"iteration $x: ${taskSets.size} taskSets")
taskSets.zipWithIndex.foreach{ case (ts, idx) => println(s"$idx:
$ts")}
```
if you do that, you would see:
```
iteration 1: 2 taskSets
0: TaskSet 0.0
1: TaskSet 1.0
iteration 2: 3 taskSets
0: TaskSet 0.0
1: TaskSet 1.0
2: TaskSet 0.1
iteration 3: 3 taskSets
0: TaskSet 0.0
1: TaskSet 1.0
2: TaskSet 0.1
iteration 4: 3 taskSets
0: TaskSet 0.0
1: TaskSet 1.0
2: TaskSet 0.1
```
In iteration 2, you can see that there is now a third taskSet, TaskSet 0.1,
which is a the second attempt for stage 0. You need to complete the tasks
associated with that taskSet. And if you did, then you see a fourth task set,
TaskSet 1.1, for the second attempt of stage 1. you should fail that, then see
a third attempt for stage 0, etc.
The reason this appears to work now, even though you never actually fail
the additional attempts for the later task sets, is because you do trigger 4
failures for stage 1. However, they are all from the task set. In fact, we
want to make sure we do *not* give up on a stage if it has multiple failures
from one task set. Eg., say attempt 1 fails, then attempt 2 fails, but then
while you are running attempt 3, you get 2 additional failures from attempts 1
& 2. That is still only 2 failed stage attempts. (This is why we needed the
stage attempt id to be able to solve this properly.)
Hope that clarifies things a little bit -- lemme know if you need more
explanation / want me to flesh out one of these tests a little more etc.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]