Repository: spark
Updated Branches:
  refs/heads/master f6c447f87 -> 3ddb9b323


[SPARK-10247] [CORE] improve readability of a test case in DAGSchedulerSuite

This is pretty minor, just trying to improve the readability of 
`DAGSchedulerSuite`, I figure every bit helps.  Before whenever I read this 
test, I never knew what "should work" and "should be ignored" really meant -- 
this adds some asserts & updates comments to make it more clear.  Also some 
reformatting per a suggestion from markhamstra on 
https://github.com/apache/spark/pull/7699

Author: Imran Rashid <iras...@cloudera.com>

Closes #8434 from squito/SPARK-10247.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3ddb9b32
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3ddb9b32
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3ddb9b32

Branch: refs/heads/master
Commit: 3ddb9b32335154e47890a0c761e0dfea3ccaac7b
Parents: f6c447f
Author: Imran Rashid <iras...@cloudera.com>
Authored: Wed Sep 2 22:14:50 2015 -0700
Committer: Andrew Or <and...@databricks.com>
Committed: Wed Sep 2 22:14:50 2015 -0700

----------------------------------------------------------------------
 .../spark/scheduler/DAGSchedulerSuite.scala     | 57 ++++++++++++++++----
 1 file changed, 47 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3ddb9b32/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 62957c6..80f64de 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -926,27 +926,64 @@ class DAGSchedulerSuite
     val shuffleId = shuffleDep.shuffleId
     val reduceRdd = new MyRDD(sc, 2, List(shuffleDep))
     submit(reduceRdd, Array(0, 1))
+
     // pretend we were told hostA went away
     val oldEpoch = mapOutputTracker.getEpoch
     runEvent(ExecutorLost("exec-hostA"))
     val newEpoch = mapOutputTracker.getEpoch
     assert(newEpoch > oldEpoch)
+
+    // now start completing some tasks in the shuffle map stage, under 
different hosts
+    // and epochs, and make sure scheduler updates its state correctly
     val taskSet = taskSets(0)
+    val shuffleStage = 
scheduler.stageIdToStage(taskSet.stageId).asInstanceOf[ShuffleMapStage]
+    assert(shuffleStage.numAvailableOutputs === 0)
+
     // should be ignored for being too old
-    runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA",
-      reduceRdd.partitions.size), null, createFakeTaskInfo(), null))
-    // should work because it's a non-failed host
-    runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostB",
-      reduceRdd.partitions.size), null, createFakeTaskInfo(), null))
+    runEvent(CompletionEvent(
+      taskSet.tasks(0),
+      Success,
+      makeMapStatus("hostA", reduceRdd.partitions.size),
+      null,
+      createFakeTaskInfo(),
+      null))
+    assert(shuffleStage.numAvailableOutputs === 0)
+
+    // should work because it's a non-failed host (so the available map 
outputs will increase)
+    runEvent(CompletionEvent(
+      taskSet.tasks(0),
+      Success,
+      makeMapStatus("hostB", reduceRdd.partitions.size),
+      null,
+      createFakeTaskInfo(),
+      null))
+    assert(shuffleStage.numAvailableOutputs === 1)
+
     // should be ignored for being too old
-    runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA",
-      reduceRdd.partitions.size), null, createFakeTaskInfo(), null))
-    // should work because it's a new epoch
+    runEvent(CompletionEvent(
+      taskSet.tasks(0),
+      Success,
+      makeMapStatus("hostA", reduceRdd.partitions.size),
+      null,
+      createFakeTaskInfo(),
+      null))
+    assert(shuffleStage.numAvailableOutputs === 1)
+
+    // should work because it's a new epoch, which will increase the number of 
available map
+    // outputs, and also finish the stage
     taskSet.tasks(1).epoch = newEpoch
-    runEvent(CompletionEvent(taskSet.tasks(1), Success, makeMapStatus("hostA",
-      reduceRdd.partitions.size), null, createFakeTaskInfo(), null))
+    runEvent(CompletionEvent(
+      taskSet.tasks(1),
+      Success,
+      makeMapStatus("hostA", reduceRdd.partitions.size),
+      null,
+      createFakeTaskInfo(),
+      null))
+    assert(shuffleStage.numAvailableOutputs === 2)
     assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 
0).map(_._1).toSet ===
            HashSet(makeBlockManagerId("hostB"), makeBlockManagerId("hostA")))
+
+    // finish the next stage normally, which completes the job
     complete(taskSets(1), Seq((Success, 42), (Success, 43)))
     assert(results === Map(0 -> 42, 1 -> 43))
     assertDataStructuresEmpty()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to