[GitHub] spark pull request #16901: [SPARK-19565] Improve DAGScheduler tests.

2017-02-20 Thread jinxing64
Github user jinxing64 closed the pull request at:

https://github.com/apache/spark/pull/16901


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16901: [SPARK-19565] Improve DAGScheduler tests.

2017-02-14 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/16901#discussion_r101064021
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
@@ -2161,6 +2161,63 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with Timeou
 }
   }
 
+  test("After fetching failed, success of old attempt of stage should be 
taken as valid.") {
+// Create 3 RDDs with shuffle dependencies on each other: rddA <--- 
rddB <--- rddC
+val rddA = new MyRDD(sc, 2, Nil)
+val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2))
+val shuffleIdA = shuffleDepA.shuffleId
+
+val rddB = new MyRDD(sc, 2, List(shuffleDepA))
+val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(2))
+
+val rddC = new MyRDD(sc, 2, List(shuffleDepB))
+
+submit(rddC, Array(0, 1))
+assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0)
+
+// Complete both tasks in rddA.
+complete(taskSets(0), Seq(
+  (Success, makeMapStatus("hostA", 2)),
+  (Success, makeMapStatus("hostA", 2
+
+// Fetch failed on hostA for task(partitionId=0) and 
task(partitionId=1) is still running.
+runEvent(makeCompletionEvent(
+  taskSets(1).tasks(0),
+  FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0,
+"Fetch failure of task: stageId=1, stageAttempt=0, partitionId=0"),
+  result = null))
+
+// Both original tasks in rddA should be marked as failed, because 
they ran on the
+// failed hostA, so both should be resubmitted. Complete them 
successfully.
+scheduler.resubmitFailedStages()
+assert(taskSets(2).stageId === 0 && taskSets(2).stageAttemptId === 1
+  && taskSets(2).tasks.size === 2)
+complete(taskSets(2), Seq(
+  (Success, makeMapStatus("hostB", 2)),
+  (Success, makeMapStatus("hostB", 2
+
+// Both tasks in rddB should be resubmitted, because none of them has 
succeeded.
+// Complete the task(partitionId=0) successfully. Task(partitionId=1) 
is still running.
+assert(taskSets(3).stageId === 1 && taskSets(3).stageAttemptId === 1 &&
+  taskSets(3).tasks.size === 2)
+runEvent(makeCompletionEvent(
+  taskSets(3).tasks(0), Success, makeMapStatus("hostB", 2)))
+
+// Complete the task(partition=1) which is from the old 
attempt(stageId=1, stageAttempt=0)
+// successfully.
+runEvent(makeCompletionEvent(
+  taskSets(1).tasks(1), Success, makeMapStatus("hostB", 2)))
+
+// Thanks to the success from old attempt of stage(stageId=1, 
stageAttempt=0), there's no
+// pending partitions for stage(stageId=1) now, thus downstream stage 
should be submitted,
+// though there's still a running task(stageId=1, stageAttempt=1, 
partitionId=1)
+// in the active stage attempt.
+assert(taskSets.size === 5 && 
taskSets(4).tasks(0).isInstanceOf[ResultTask[_, _]])
+complete(taskSets(4), Seq(
--- End diff --

https://issues.apache.org/jira/browse/SPARK-19596


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16901: [SPARK-19565] Improve DAGScheduler tests.

2017-02-14 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/16901#discussion_r101061681
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
@@ -2161,6 +2161,63 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with Timeou
 }
   }
 
+  test("After fetching failed, success of old attempt of stage should be 
taken as valid.") {
+// Create 3 RDDs with shuffle dependencies on each other: rddA <--- 
rddB <--- rddC
+val rddA = new MyRDD(sc, 2, Nil)
+val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2))
+val shuffleIdA = shuffleDepA.shuffleId
+
+val rddB = new MyRDD(sc, 2, List(shuffleDepA))
+val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(2))
+
+val rddC = new MyRDD(sc, 2, List(shuffleDepB))
+
+submit(rddC, Array(0, 1))
+assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0)
+
+// Complete both tasks in rddA.
+complete(taskSets(0), Seq(
+  (Success, makeMapStatus("hostA", 2)),
+  (Success, makeMapStatus("hostA", 2
+
+// Fetch failed on hostA for task(partitionId=0) and 
task(partitionId=1) is still running.
+runEvent(makeCompletionEvent(
+  taskSets(1).tasks(0),
+  FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0,
+"Fetch failure of task: stageId=1, stageAttempt=0, partitionId=0"),
+  result = null))
+
+// Both original tasks in rddA should be marked as failed, because 
they ran on the
+// failed hostA, so both should be resubmitted. Complete them 
successfully.
+scheduler.resubmitFailedStages()
+assert(taskSets(2).stageId === 0 && taskSets(2).stageAttemptId === 1
+  && taskSets(2).tasks.size === 2)
+complete(taskSets(2), Seq(
+  (Success, makeMapStatus("hostB", 2)),
+  (Success, makeMapStatus("hostB", 2
+
+// Both tasks in rddB should be resubmitted, because none of them has 
succeeded.
+// Complete the task(partitionId=0) successfully. Task(partitionId=1) 
is still running.
+assert(taskSets(3).stageId === 1 && taskSets(3).stageAttemptId === 1 &&
+  taskSets(3).tasks.size === 2)
+runEvent(makeCompletionEvent(
+  taskSets(3).tasks(0), Success, makeMapStatus("hostB", 2)))
+
+// Complete the task(partition=1) which is from the old 
attempt(stageId=1, stageAttempt=0)
+// successfully.
+runEvent(makeCompletionEvent(
+  taskSets(1).tasks(1), Success, makeMapStatus("hostB", 2)))
+
+// Thanks to the success from old attempt of stage(stageId=1, 
stageAttempt=0), there's no
+// pending partitions for stage(stageId=1) now, thus downstream stage 
should be submitted,
+// though there's still a running task(stageId=1, stageAttempt=1, 
partitionId=1)
+// in the active stage attempt.
+assert(taskSets.size === 5 && 
taskSets(4).tasks(0).isInstanceOf[ResultTask[_, _]])
+complete(taskSets(4), Seq(
--- End diff --

I was going to suggest adding a check here to make sure that all prior 
tasksetmanagers are marked as zombies.  But (a) you can't check that, since the 
dagscheduler doesn't have a handle on the tasksetmanagers, and (b) more 
importantly, the prior TSMs actually are *not* marked as zombies.  So they may 
continue to submit tasks even though they're not necessary.

I will file a separate bug for that -- its a performance issue, not a 
correctness issue, so not critical.  But this isn't the same as the old "kill 
running tasks when marking a tsm as a zombie" -- in this case, the issue is 
that the tsm may continue to launch *new* tasks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16901: [SPARK-19565] Improve DAGScheduler tests.

2017-02-14 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/16901#discussion_r101059074
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
@@ -2161,6 +2161,63 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with Timeou
 }
   }
 
+  test("After fetching failed, success of old attempt of stage should be 
taken as valid.") {
--- End diff --

can you rename to "After a fetch failure, success ..."
really minor, but I had to read this twice


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16901: [SPARK-19565] Improve DAGScheduler tests.

2017-02-13 Thread jinxing64
Github user jinxing64 commented on a diff in the pull request:

https://github.com/apache/spark/pull/16901#discussion_r100968529
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
@@ -2161,6 +2161,48 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with Timeou
 }
   }
 
+  test("After fetching failed, success of old attempt of stage should be 
taken as valid.") {
+val rddA = new MyRDD(sc, 2, Nil)
+val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2))
+val shuffleIdA = shuffleDepA.shuffleId
+
+val rddB = new MyRDD(sc, 2, List(shuffleDepA))
+val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(2))
+
+val rddC = new MyRDD(sc, 2, List(shuffleDepB))
+
+submit(rddC, Array(0, 1))
+assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0)
+
+complete(taskSets(0), Seq(
+  (Success, makeMapStatus("hostA", 2)),
+  (Success, makeMapStatus("hostA", 2
+
+// Fetch failed on hostA for task(partitionId=0) and success on hostB 
for task(partitionId=1)
+complete(taskSets(1), Seq(
+  (FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0,
+"Fetch failure of task: stageId=1, stageAttempt=0, 
partitionId=0"), null),
+  (Success, makeMapStatus("hostB", 2
+
+scheduler.resubmitFailedStages()
+assert(taskSets(2).stageId === 0 && taskSets(2).stageAttemptId === 1)
+complete(taskSets(2), Seq(
+  (Success, makeMapStatus("hostB", 2)),
+  (Success, makeMapStatus("hostB", 2
+
+assert(taskSets(3).stageId === 1 && taskSets(2).stageAttemptId === 1)
+runEvent(makeCompletionEvent(
+  taskSets(3).tasks(0), Success, makeMapStatus("hostB", 2)))
+
+// Thanks to the success from old attempt of stage(stageId=1), there's 
no pending
--- End diff --

Yes, the success should be moved. Sorry for this and I'll rectify.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16901: [SPARK-19565] Improve DAGScheduler tests.

2017-02-13 Thread kayousterhout
Github user kayousterhout commented on a diff in the pull request:

https://github.com/apache/spark/pull/16901#discussion_r100885960
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
@@ -2161,6 +2161,48 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with Timeou
 }
   }
 
+  test("After fetching failed, success of old attempt of stage should be 
taken as valid.") {
+val rddA = new MyRDD(sc, 2, Nil)
+val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2))
+val shuffleIdA = shuffleDepA.shuffleId
+
+val rddB = new MyRDD(sc, 2, List(shuffleDepA))
+val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(2))
+
+val rddC = new MyRDD(sc, 2, List(shuffleDepB))
+
+submit(rddC, Array(0, 1))
+assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0)
+
+complete(taskSets(0), Seq(
--- End diff --

add a comment saying something like "Complete both tasks in rddA"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16901: [SPARK-19565] Improve DAGScheduler tests.

2017-02-13 Thread kayousterhout
Github user kayousterhout commented on a diff in the pull request:

https://github.com/apache/spark/pull/16901#discussion_r100885083
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
@@ -2161,6 +2161,48 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with Timeou
 }
   }
 
+  test("After fetching failed, success of old attempt of stage should be 
taken as valid.") {
+val rddA = new MyRDD(sc, 2, Nil)
--- End diff --

can you add a brief comment here with something like:

/// Create 3 RDDs with shuffle dependencies on each other: A <--- B < C


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16901: [SPARK-19565] Improve DAGScheduler tests.

2017-02-13 Thread kayousterhout
Github user kayousterhout commented on a diff in the pull request:

https://github.com/apache/spark/pull/16901#discussion_r100886335
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
@@ -2161,6 +2161,48 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with Timeou
 }
   }
 
+  test("After fetching failed, success of old attempt of stage should be 
taken as valid.") {
+val rddA = new MyRDD(sc, 2, Nil)
+val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2))
+val shuffleIdA = shuffleDepA.shuffleId
+
+val rddB = new MyRDD(sc, 2, List(shuffleDepA))
+val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(2))
+
+val rddC = new MyRDD(sc, 2, List(shuffleDepB))
+
+submit(rddC, Array(0, 1))
+assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0)
+
+complete(taskSets(0), Seq(
+  (Success, makeMapStatus("hostA", 2)),
+  (Success, makeMapStatus("hostA", 2
+
+// Fetch failed on hostA for task(partitionId=0) and success on hostB 
for task(partitionId=1)
+complete(taskSets(1), Seq(
+  (FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0,
+"Fetch failure of task: stageId=1, stageAttempt=0, 
partitionId=0"), null),
+  (Success, makeMapStatus("hostB", 2
+
+scheduler.resubmitFailedStages()
--- End diff --

add a comment here saying something like "Both original tasks in rddA 
should be marked as failed, because they ran on the failed hostA, so both 
should be resubmitted.  Complete them successfully."


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16901: [SPARK-19565] Improve DAGScheduler tests.

2017-02-13 Thread kayousterhout
Github user kayousterhout commented on a diff in the pull request:

https://github.com/apache/spark/pull/16901#discussion_r100886876
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
@@ -2161,6 +2161,48 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with Timeou
 }
   }
 
+  test("After fetching failed, success of old attempt of stage should be 
taken as valid.") {
+val rddA = new MyRDD(sc, 2, Nil)
+val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2))
+val shuffleIdA = shuffleDepA.shuffleId
+
+val rddB = new MyRDD(sc, 2, List(shuffleDepA))
+val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(2))
+
+val rddC = new MyRDD(sc, 2, List(shuffleDepB))
+
+submit(rddC, Array(0, 1))
+assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0)
+
+complete(taskSets(0), Seq(
+  (Success, makeMapStatus("hostA", 2)),
+  (Success, makeMapStatus("hostA", 2
+
+// Fetch failed on hostA for task(partitionId=0) and success on hostB 
for task(partitionId=1)
+complete(taskSets(1), Seq(
+  (FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0,
+"Fetch failure of task: stageId=1, stageAttempt=0, 
partitionId=0"), null),
+  (Success, makeMapStatus("hostB", 2
+
+scheduler.resubmitFailedStages()
+assert(taskSets(2).stageId === 0 && taskSets(2).stageAttemptId === 1)
+complete(taskSets(2), Seq(
+  (Success, makeMapStatus("hostB", 2)),
+  (Success, makeMapStatus("hostB", 2
+
+assert(taskSets(3).stageId === 1 && taskSets(2).stageAttemptId === 1)
+runEvent(makeCompletionEvent(
+  taskSets(3).tasks(0), Success, makeMapStatus("hostB", 2)))
+
+// Thanks to the success from old attempt of stage(stageId=1), there's 
no pending
--- End diff --

It looks like the success above is from the newer attempt of the stage 
(since you're taking the task from taskSets(3), not taskSets(1)), which is 
inconsistent with the comment.  I think perhaps the intention here was to *not* 
finish one of the tasks from taskSets(1) in the first time around (i.e., 
eliminate the Success on line 2185)) and then move that success here (instead 
of completing the task from the more recent task set)?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16901: [SPARK-19565] Improve DAGScheduler tests.

2017-02-13 Thread kayousterhout
Github user kayousterhout commented on a diff in the pull request:

https://github.com/apache/spark/pull/16901#discussion_r100886451
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
@@ -2161,6 +2161,48 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with Timeou
 }
   }
 
+  test("After fetching failed, success of old attempt of stage should be 
taken as valid.") {
+val rddA = new MyRDD(sc, 2, Nil)
+val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2))
+val shuffleIdA = shuffleDepA.shuffleId
+
+val rddB = new MyRDD(sc, 2, List(shuffleDepA))
+val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(2))
+
+val rddC = new MyRDD(sc, 2, List(shuffleDepB))
+
+submit(rddC, Array(0, 1))
+assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0)
+
+complete(taskSets(0), Seq(
+  (Success, makeMapStatus("hostA", 2)),
+  (Success, makeMapStatus("hostA", 2
+
+// Fetch failed on hostA for task(partitionId=0) and success on hostB 
for task(partitionId=1)
+complete(taskSets(1), Seq(
+  (FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0,
+"Fetch failure of task: stageId=1, stageAttempt=0, 
partitionId=0"), null),
+  (Success, makeMapStatus("hostB", 2
+
+scheduler.resubmitFailedStages()
+assert(taskSets(2).stageId === 0 && taskSets(2).stageAttemptId === 1)
+complete(taskSets(2), Seq(
+  (Success, makeMapStatus("hostB", 2)),
+  (Success, makeMapStatus("hostB", 2
+
+assert(taskSets(3).stageId === 1 && taskSets(2).stageAttemptId === 1)
--- End diff --

should the second condition in the assert be checking taskSets(3) (not 
taskSets(2) again?)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16901: [SPARK-19565] Improve DAGScheduler tests.

2017-02-12 Thread jinxing64
GitHub user jinxing64 opened a pull request:

https://github.com/apache/spark/pull/16901

[SPARK-19565] Improve DAGScheduler tests.

## What changes were proposed in this pull request?

This is related to #16620. 
When fetch failed, stage will be resubmitted. There can be running tasks 
from both old and new stage attempts. This pr added a test to check the case 
that success of tasks from old stage attempt should be taken as valid and 
partitionId should be removed from stage's pendingPartitions accordingly. When 
pending partitions is empty, downstream stage can be scheduled, even though 
there's still running tasks in the active(new) stage attempt.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jinxing64/spark SPARK-19565

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/16901.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #16901


commit f5df21405f1eb31c7f32ffd7b32ed02abbc6d033
Author: jinxing 
Date:   2017-02-12T12:39:53Z

[SPARK-19565] Improve DAGScheduler tests.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org