[GitHub] spark pull request #16901: [SPARK-19565] Improve DAGScheduler tests.
Github user jinxing64 closed the pull request at: https://github.com/apache/spark/pull/16901 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16901: [SPARK-19565] Improve DAGScheduler tests.
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/16901#discussion_r101064021 --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala --- @@ -2161,6 +2161,63 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou } } + test("After fetching failed, success of old attempt of stage should be taken as valid.") { +// Create 3 RDDs with shuffle dependencies on each other: rddA <--- rddB <--- rddC +val rddA = new MyRDD(sc, 2, Nil) +val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2)) +val shuffleIdA = shuffleDepA.shuffleId + +val rddB = new MyRDD(sc, 2, List(shuffleDepA)) +val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(2)) + +val rddC = new MyRDD(sc, 2, List(shuffleDepB)) + +submit(rddC, Array(0, 1)) +assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0) + +// Complete both tasks in rddA. +complete(taskSets(0), Seq( + (Success, makeMapStatus("hostA", 2)), + (Success, makeMapStatus("hostA", 2 + +// Fetch failed on hostA for task(partitionId=0) and task(partitionId=1) is still running. +runEvent(makeCompletionEvent( + taskSets(1).tasks(0), + FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0, +"Fetch failure of task: stageId=1, stageAttempt=0, partitionId=0"), + result = null)) + +// Both original tasks in rddA should be marked as failed, because they ran on the +// failed hostA, so both should be resubmitted. Complete them successfully. +scheduler.resubmitFailedStages() +assert(taskSets(2).stageId === 0 && taskSets(2).stageAttemptId === 1 + && taskSets(2).tasks.size === 2) +complete(taskSets(2), Seq( + (Success, makeMapStatus("hostB", 2)), + (Success, makeMapStatus("hostB", 2 + +// Both tasks in rddB should be resubmitted, because none of them has succeeded. +// Complete the task(partitionId=0) successfully. Task(partitionId=1) is still running. +assert(taskSets(3).stageId === 1 && taskSets(3).stageAttemptId === 1 && + taskSets(3).tasks.size === 2) +runEvent(makeCompletionEvent( + taskSets(3).tasks(0), Success, makeMapStatus("hostB", 2))) + +// Complete the task(partition=1) which is from the old attempt(stageId=1, stageAttempt=0) +// successfully. +runEvent(makeCompletionEvent( + taskSets(1).tasks(1), Success, makeMapStatus("hostB", 2))) + +// Thanks to the success from old attempt of stage(stageId=1, stageAttempt=0), there's no +// pending partitions for stage(stageId=1) now, thus downstream stage should be submitted, +// though there's still a running task(stageId=1, stageAttempt=1, partitionId=1) +// in the active stage attempt. +assert(taskSets.size === 5 && taskSets(4).tasks(0).isInstanceOf[ResultTask[_, _]]) +complete(taskSets(4), Seq( --- End diff -- https://issues.apache.org/jira/browse/SPARK-19596 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16901: [SPARK-19565] Improve DAGScheduler tests.
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/16901#discussion_r101061681 --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala --- @@ -2161,6 +2161,63 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou } } + test("After fetching failed, success of old attempt of stage should be taken as valid.") { +// Create 3 RDDs with shuffle dependencies on each other: rddA <--- rddB <--- rddC +val rddA = new MyRDD(sc, 2, Nil) +val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2)) +val shuffleIdA = shuffleDepA.shuffleId + +val rddB = new MyRDD(sc, 2, List(shuffleDepA)) +val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(2)) + +val rddC = new MyRDD(sc, 2, List(shuffleDepB)) + +submit(rddC, Array(0, 1)) +assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0) + +// Complete both tasks in rddA. +complete(taskSets(0), Seq( + (Success, makeMapStatus("hostA", 2)), + (Success, makeMapStatus("hostA", 2 + +// Fetch failed on hostA for task(partitionId=0) and task(partitionId=1) is still running. +runEvent(makeCompletionEvent( + taskSets(1).tasks(0), + FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0, +"Fetch failure of task: stageId=1, stageAttempt=0, partitionId=0"), + result = null)) + +// Both original tasks in rddA should be marked as failed, because they ran on the +// failed hostA, so both should be resubmitted. Complete them successfully. +scheduler.resubmitFailedStages() +assert(taskSets(2).stageId === 0 && taskSets(2).stageAttemptId === 1 + && taskSets(2).tasks.size === 2) +complete(taskSets(2), Seq( + (Success, makeMapStatus("hostB", 2)), + (Success, makeMapStatus("hostB", 2 + +// Both tasks in rddB should be resubmitted, because none of them has succeeded. +// Complete the task(partitionId=0) successfully. Task(partitionId=1) is still running. +assert(taskSets(3).stageId === 1 && taskSets(3).stageAttemptId === 1 && + taskSets(3).tasks.size === 2) +runEvent(makeCompletionEvent( + taskSets(3).tasks(0), Success, makeMapStatus("hostB", 2))) + +// Complete the task(partition=1) which is from the old attempt(stageId=1, stageAttempt=0) +// successfully. +runEvent(makeCompletionEvent( + taskSets(1).tasks(1), Success, makeMapStatus("hostB", 2))) + +// Thanks to the success from old attempt of stage(stageId=1, stageAttempt=0), there's no +// pending partitions for stage(stageId=1) now, thus downstream stage should be submitted, +// though there's still a running task(stageId=1, stageAttempt=1, partitionId=1) +// in the active stage attempt. +assert(taskSets.size === 5 && taskSets(4).tasks(0).isInstanceOf[ResultTask[_, _]]) +complete(taskSets(4), Seq( --- End diff -- I was going to suggest adding a check here to make sure that all prior tasksetmanagers are marked as zombies. But (a) you can't check that, since the dagscheduler doesn't have a handle on the tasksetmanagers, and (b) more importantly, the prior TSMs actually are *not* marked as zombies. So they may continue to submit tasks even though they're not necessary. I will file a separate bug for that -- its a performance issue, not a correctness issue, so not critical. But this isn't the same as the old "kill running tasks when marking a tsm as a zombie" -- in this case, the issue is that the tsm may continue to launch *new* tasks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16901: [SPARK-19565] Improve DAGScheduler tests.
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/16901#discussion_r101059074 --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala --- @@ -2161,6 +2161,63 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou } } + test("After fetching failed, success of old attempt of stage should be taken as valid.") { --- End diff -- can you rename to "After a fetch failure, success ..." really minor, but I had to read this twice --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16901: [SPARK-19565] Improve DAGScheduler tests.
Github user jinxing64 commented on a diff in the pull request: https://github.com/apache/spark/pull/16901#discussion_r100968529 --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala --- @@ -2161,6 +2161,48 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou } } + test("After fetching failed, success of old attempt of stage should be taken as valid.") { +val rddA = new MyRDD(sc, 2, Nil) +val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2)) +val shuffleIdA = shuffleDepA.shuffleId + +val rddB = new MyRDD(sc, 2, List(shuffleDepA)) +val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(2)) + +val rddC = new MyRDD(sc, 2, List(shuffleDepB)) + +submit(rddC, Array(0, 1)) +assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0) + +complete(taskSets(0), Seq( + (Success, makeMapStatus("hostA", 2)), + (Success, makeMapStatus("hostA", 2 + +// Fetch failed on hostA for task(partitionId=0) and success on hostB for task(partitionId=1) +complete(taskSets(1), Seq( + (FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0, +"Fetch failure of task: stageId=1, stageAttempt=0, partitionId=0"), null), + (Success, makeMapStatus("hostB", 2 + +scheduler.resubmitFailedStages() +assert(taskSets(2).stageId === 0 && taskSets(2).stageAttemptId === 1) +complete(taskSets(2), Seq( + (Success, makeMapStatus("hostB", 2)), + (Success, makeMapStatus("hostB", 2 + +assert(taskSets(3).stageId === 1 && taskSets(2).stageAttemptId === 1) +runEvent(makeCompletionEvent( + taskSets(3).tasks(0), Success, makeMapStatus("hostB", 2))) + +// Thanks to the success from old attempt of stage(stageId=1), there's no pending --- End diff -- Yes, the success should be moved. Sorry for this and I'll rectify. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16901: [SPARK-19565] Improve DAGScheduler tests.
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/16901#discussion_r100885960 --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala --- @@ -2161,6 +2161,48 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou } } + test("After fetching failed, success of old attempt of stage should be taken as valid.") { +val rddA = new MyRDD(sc, 2, Nil) +val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2)) +val shuffleIdA = shuffleDepA.shuffleId + +val rddB = new MyRDD(sc, 2, List(shuffleDepA)) +val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(2)) + +val rddC = new MyRDD(sc, 2, List(shuffleDepB)) + +submit(rddC, Array(0, 1)) +assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0) + +complete(taskSets(0), Seq( --- End diff -- add a comment saying something like "Complete both tasks in rddA" --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16901: [SPARK-19565] Improve DAGScheduler tests.
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/16901#discussion_r100885083 --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala --- @@ -2161,6 +2161,48 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou } } + test("After fetching failed, success of old attempt of stage should be taken as valid.") { +val rddA = new MyRDD(sc, 2, Nil) --- End diff -- can you add a brief comment here with something like: /// Create 3 RDDs with shuffle dependencies on each other: A <--- B < C --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16901: [SPARK-19565] Improve DAGScheduler tests.
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/16901#discussion_r100886335 --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala --- @@ -2161,6 +2161,48 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou } } + test("After fetching failed, success of old attempt of stage should be taken as valid.") { +val rddA = new MyRDD(sc, 2, Nil) +val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2)) +val shuffleIdA = shuffleDepA.shuffleId + +val rddB = new MyRDD(sc, 2, List(shuffleDepA)) +val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(2)) + +val rddC = new MyRDD(sc, 2, List(shuffleDepB)) + +submit(rddC, Array(0, 1)) +assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0) + +complete(taskSets(0), Seq( + (Success, makeMapStatus("hostA", 2)), + (Success, makeMapStatus("hostA", 2 + +// Fetch failed on hostA for task(partitionId=0) and success on hostB for task(partitionId=1) +complete(taskSets(1), Seq( + (FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0, +"Fetch failure of task: stageId=1, stageAttempt=0, partitionId=0"), null), + (Success, makeMapStatus("hostB", 2 + +scheduler.resubmitFailedStages() --- End diff -- add a comment here saying something like "Both original tasks in rddA should be marked as failed, because they ran on the failed hostA, so both should be resubmitted. Complete them successfully." --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16901: [SPARK-19565] Improve DAGScheduler tests.
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/16901#discussion_r100886876 --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala --- @@ -2161,6 +2161,48 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou } } + test("After fetching failed, success of old attempt of stage should be taken as valid.") { +val rddA = new MyRDD(sc, 2, Nil) +val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2)) +val shuffleIdA = shuffleDepA.shuffleId + +val rddB = new MyRDD(sc, 2, List(shuffleDepA)) +val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(2)) + +val rddC = new MyRDD(sc, 2, List(shuffleDepB)) + +submit(rddC, Array(0, 1)) +assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0) + +complete(taskSets(0), Seq( + (Success, makeMapStatus("hostA", 2)), + (Success, makeMapStatus("hostA", 2 + +// Fetch failed on hostA for task(partitionId=0) and success on hostB for task(partitionId=1) +complete(taskSets(1), Seq( + (FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0, +"Fetch failure of task: stageId=1, stageAttempt=0, partitionId=0"), null), + (Success, makeMapStatus("hostB", 2 + +scheduler.resubmitFailedStages() +assert(taskSets(2).stageId === 0 && taskSets(2).stageAttemptId === 1) +complete(taskSets(2), Seq( + (Success, makeMapStatus("hostB", 2)), + (Success, makeMapStatus("hostB", 2 + +assert(taskSets(3).stageId === 1 && taskSets(2).stageAttemptId === 1) +runEvent(makeCompletionEvent( + taskSets(3).tasks(0), Success, makeMapStatus("hostB", 2))) + +// Thanks to the success from old attempt of stage(stageId=1), there's no pending --- End diff -- It looks like the success above is from the newer attempt of the stage (since you're taking the task from taskSets(3), not taskSets(1)), which is inconsistent with the comment. I think perhaps the intention here was to *not* finish one of the tasks from taskSets(1) in the first time around (i.e., eliminate the Success on line 2185)) and then move that success here (instead of completing the task from the more recent task set)? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16901: [SPARK-19565] Improve DAGScheduler tests.
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/16901#discussion_r100886451 --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala --- @@ -2161,6 +2161,48 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou } } + test("After fetching failed, success of old attempt of stage should be taken as valid.") { +val rddA = new MyRDD(sc, 2, Nil) +val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2)) +val shuffleIdA = shuffleDepA.shuffleId + +val rddB = new MyRDD(sc, 2, List(shuffleDepA)) +val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(2)) + +val rddC = new MyRDD(sc, 2, List(shuffleDepB)) + +submit(rddC, Array(0, 1)) +assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0) + +complete(taskSets(0), Seq( + (Success, makeMapStatus("hostA", 2)), + (Success, makeMapStatus("hostA", 2 + +// Fetch failed on hostA for task(partitionId=0) and success on hostB for task(partitionId=1) +complete(taskSets(1), Seq( + (FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0, +"Fetch failure of task: stageId=1, stageAttempt=0, partitionId=0"), null), + (Success, makeMapStatus("hostB", 2 + +scheduler.resubmitFailedStages() +assert(taskSets(2).stageId === 0 && taskSets(2).stageAttemptId === 1) +complete(taskSets(2), Seq( + (Success, makeMapStatus("hostB", 2)), + (Success, makeMapStatus("hostB", 2 + +assert(taskSets(3).stageId === 1 && taskSets(2).stageAttemptId === 1) --- End diff -- should the second condition in the assert be checking taskSets(3) (not taskSets(2) again?) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16901: [SPARK-19565] Improve DAGScheduler tests.
GitHub user jinxing64 opened a pull request: https://github.com/apache/spark/pull/16901 [SPARK-19565] Improve DAGScheduler tests. ## What changes were proposed in this pull request? This is related to #16620. When fetch failed, stage will be resubmitted. There can be running tasks from both old and new stage attempts. This pr added a test to check the case that success of tasks from old stage attempt should be taken as valid and partitionId should be removed from stage's pendingPartitions accordingly. When pending partitions is empty, downstream stage can be scheduled, even though there's still running tasks in the active(new) stage attempt. You can merge this pull request into a Git repository by running: $ git pull https://github.com/jinxing64/spark SPARK-19565 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/16901.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #16901 commit f5df21405f1eb31c7f32ffd7b32ed02abbc6d033 Author: jinxingDate: 2017-02-12T12:39:53Z [SPARK-19565] Improve DAGScheduler tests. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org