[GitHub] spark pull request #21019: [SPARK-23948] Trigger mapstage's job listener in ...

2018-04-17 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/21019


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21019: [SPARK-23948] Trigger mapstage's job listener in ...

2018-04-16 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/21019#discussion_r181889300
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1433,6 +1426,16 @@ class DAGScheduler(
 }
   }
 
+  private[scheduler] def markMapStageJobsAsFinished(shuffleStage: 
ShuffleMapStage): Unit = {
+// Mark any map-stage jobs waiting on this stage as finished
+if (shuffleStage.isAvailable && shuffleStage.mapStageJobs.nonEmpty) {
--- End diff --

doesn't seem this is necessary, as its already handled at the callsites ... 
but IMO its seems safer to include it, in case this gets invoked elsewhere in 
the future.  


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21019: [SPARK-23948] Trigger mapstage's job listener in ...

2018-04-16 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/21019#discussion_r181891158
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
@@ -2146,6 +2146,57 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with TimeLi
 assertDataStructuresEmpty()
   }
 
+  test("Trigger mapstage's job listener in submitMissingTasks") {
+val rdd1 = new MyRDD(sc, 2, Nil)
+val dep1 = new ShuffleDependency(rdd1, new HashPartitioner(2))
+val rdd2 = new MyRDD(sc, 2, List(dep1), tracker = mapOutputTracker)
+val dep2 = new ShuffleDependency(rdd2, new HashPartitioner(2))
+
+val listener1 = new SimpleListener
+val listener2 = new SimpleListener
+
+submitMapStage(dep1, listener1)
+submitMapStage(dep2, listener2)
+
+// Complete the stage0.
+assert(taskSets(0).stageId === 0)
+complete(taskSets(0), Seq(
+  (Success, makeMapStatus("hostA", rdd1.partitions.length)),
+  (Success, makeMapStatus("hostB", rdd1.partitions.length
+assert(mapOutputTracker.getMapSizesByExecutorId(dep1.shuffleId, 
0).map(_._1).toSet ===
+HashSet(makeBlockManagerId("hostA"), makeBlockManagerId("hostB")))
+assert(listener1.results.size === 1)
+
+// When attempting stage1, trigger a fetch failure.
+assert(taskSets(1).stageId === 1)
+complete(taskSets(1), Seq(
+  (Success, makeMapStatus("hostC", rdd2.partitions.length)),
+  (FetchFailed(makeBlockManagerId("hostA"), dep1.shuffleId, 0, 0, 
"ignored"), null)))
+scheduler.resubmitFailedStages()
+// Stage1 listener should not have a result yet
+assert(listener2.results.size === 0)
+
+// Speculative task succeeded in stage1.
+runEvent(makeCompletionEvent(
+  taskSets(1).tasks(1),
+  Success,
+  makeMapStatus("hostD", rdd2.partitions.length)))
+// stage1 listener still should not have a result, though there's no 
missing partitions
+// in it. Because stage1 is not inside runningStages at this moment.
+assert(listener2.results.size === 0)
+
+// Stage0 should now be running as task set 2; make its task succeed
+assert(taskSets(2).stageId === 0)
+complete(taskSets(2), Seq(
+  (Success, makeMapStatus("hostC", rdd2.partitions.length
+assert(mapOutputTracker.getMapSizesByExecutorId(dep1.shuffleId, 
0).map(_._1).toSet ===
+HashSet(makeBlockManagerId("hostC"), makeBlockManagerId("hostB")))
--- End diff --

can just use `Set` here


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21019: [SPARK-23948] Trigger mapstage's job listener in ...

2018-04-16 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/21019#discussion_r181892493
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
@@ -2146,6 +2146,57 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with TimeLi
 assertDataStructuresEmpty()
   }
 
+  test("Trigger mapstage's job listener in submitMissingTasks") {
+val rdd1 = new MyRDD(sc, 2, Nil)
+val dep1 = new ShuffleDependency(rdd1, new HashPartitioner(2))
+val rdd2 = new MyRDD(sc, 2, List(dep1), tracker = mapOutputTracker)
+val dep2 = new ShuffleDependency(rdd2, new HashPartitioner(2))
+
+val listener1 = new SimpleListener
+val listener2 = new SimpleListener
+
+submitMapStage(dep1, listener1)
+submitMapStage(dep2, listener2)
+
+// Complete the stage0.
+assert(taskSets(0).stageId === 0)
+complete(taskSets(0), Seq(
+  (Success, makeMapStatus("hostA", rdd1.partitions.length)),
+  (Success, makeMapStatus("hostB", rdd1.partitions.length
+assert(mapOutputTracker.getMapSizesByExecutorId(dep1.shuffleId, 
0).map(_._1).toSet ===
+HashSet(makeBlockManagerId("hostA"), makeBlockManagerId("hostB")))
+assert(listener1.results.size === 1)
+
+// When attempting stage1, trigger a fetch failure.
+assert(taskSets(1).stageId === 1)
+complete(taskSets(1), Seq(
+  (Success, makeMapStatus("hostC", rdd2.partitions.length)),
+  (FetchFailed(makeBlockManagerId("hostA"), dep1.shuffleId, 0, 0, 
"ignored"), null)))
+scheduler.resubmitFailedStages()
+// Stage1 listener should not have a result yet
+assert(listener2.results.size === 0)
+
+// Speculative task succeeded in stage1.
+runEvent(makeCompletionEvent(
+  taskSets(1).tasks(1),
+  Success,
+  makeMapStatus("hostD", rdd2.partitions.length)))
+// stage1 listener still should not have a result, though there's no 
missing partitions
+// in it. Because stage1 is not inside runningStages at this moment.
+assert(listener2.results.size === 0)
+
+// Stage0 should now be running as task set 2; make its task succeed
+assert(taskSets(2).stageId === 0)
+complete(taskSets(2), Seq(
+  (Success, makeMapStatus("hostC", rdd2.partitions.length
+assert(mapOutputTracker.getMapSizesByExecutorId(dep1.shuffleId, 
0).map(_._1).toSet ===
+HashSet(makeBlockManagerId("hostC"), makeBlockManagerId("hostB")))
+
+// After stage0 is finished, stage1 will be submitted and found there 
is no missing
+// partitions in it. Then listener got triggered.
+assert(listener2.results.size === 1)
+  }
--- End diff --

can you also add `assertDataStructuresEmpty()` please?  I know its not 
really related to your change but nice to include this in all the tests.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21019: [SPARK-23948] Trigger mapstage's job listener in ...

2018-04-16 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21019#discussion_r181778477
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1433,6 +1426,16 @@ class DAGScheduler(
 }
   }
 
+  private[scheduler] def markMapStageJobsAsFinished(shuffleStage: 
ShuffleMapStage): Unit = {
+// Mark any map-stage jobs waiting on this stage as finished
+if (shuffleStage.isAvailable && shuffleStage.mapStageJobs.nonEmpty) {
--- End diff --

Why do we need to double check that `shuffleStage.isAvailable` here?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21019: [SPARK-23948] Trigger mapstage's job listener in ...

2018-04-16 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21019#discussion_r181779186
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
@@ -2146,6 +2146,57 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with TimeLi
 assertDataStructuresEmpty()
   }
 
+  test("Trigger mapstage's job listener in submitMissingTasks") {
+val rdd1 = new MyRDD(sc, 2, Nil)
+val dep1 = new ShuffleDependency(rdd1, new HashPartitioner(2))
+val rdd2 = new MyRDD(sc, 2, List(dep1), tracker = mapOutputTracker)
+val dep2 = new ShuffleDependency(rdd2, new HashPartitioner(2))
+
+val listener1 = new SimpleListener
+val listener2 = new SimpleListener
+
+submitMapStage(dep1, listener1)
+submitMapStage(dep2, listener2)
+
+// Complete the stage0.
+assert(taskSets(0).stageId === 0)
+complete(taskSets(0), Seq(
+  (Success, makeMapStatus("hostA", rdd1.partitions.length)),
+  (Success, makeMapStatus("hostB", rdd1.partitions.length
+assert(mapOutputTracker.getMapSizesByExecutorId(dep1.shuffleId, 
0).map(_._1).toSet ===
+HashSet(makeBlockManagerId("hostA"), makeBlockManagerId("hostB")))
+assert(listener1.results.size === 1)
+
+// When attempting stage1, trigger a fetch failure.
+assert(taskSets(1).stageId === 1)
+complete(taskSets(1), Seq(
+  (Success, makeMapStatus("hostC", rdd2.partitions.length)),
+  (FetchFailed(makeBlockManagerId("hostA"), dep1.shuffleId, 0, 0, 
"ignored"), null)))
+scheduler.resubmitFailedStages()
+// Stage1 listener should not have a result yet
+assert(listener2.results.size === 0)
+
+// Speculative task succeeded in stage1.
+runEvent(makeCompletionEvent(
+  taskSets(1).tasks(1),
+  Success,
+  makeMapStatus("hostD", rdd2.partitions.length)))
+// stage1 listener still should not have a result, though there's no 
missing partitions
+// in it. Because stage1 is not inside runningStages at this moment.
--- End diff --

nit: ```Because stage1 has been failed and is not inside `runningStages` at 
this moment.```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21019: [SPARK-23948] Trigger mapstage's job listener in ...

2018-04-09 Thread jinxing64
GitHub user jinxing64 opened a pull request:

https://github.com/apache/spark/pull/21019

[SPARK-23948] Trigger mapstage's job listener in submitMissingTasks

## What changes were proposed in this pull request?

SparkContext submitted a map stage from `submitMapStage` to `DAGScheduler`, 
`markMapStageJobAsFinished` is called only in ();

But think about below scenario:
1. stage0 and stage1 are all `ShuffleMapStage` and stage1 depends on stage0;
2. We submit stage1 by `submitMapStage`, there are 10 missing tasks in 
stage1
3. When stage 1 running, `FetchFailed` happened, stage0 and stage1 got 
resubmitted as stage0_1 and stage1_1;
4. When stage0_1 running, speculated tasks in old stage1 come as succeeded, 
but stage1 is not inside `runningStages`. So even though all splits(including 
the speculated tasks) in stage1 succeeded, job listener in stage1 will not be 
called;
5. stage0_1 finished, stage1_1 starts running. When `submitMissingTasks`, 
there is no missing tasks. But in current code, job listener is not triggered

## How was this patch tested?

Not added yet.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jinxing64/spark SPARK-23948

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21019.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21019


commit 685124a11b789af2a42b4978e25ed404b2a15176
Author: jinxing 
Date:   2018-04-10T03:33:02Z

[SPARK-23948] Trigger mapstage's job listener in submitMissingTasks




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org