Github user tgravescs commented on a diff in the pull request: https://github.com/apache/spark/pull/19194#discussion_r139156205 --- Diff: core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala --- @@ -210,22 +216,282 @@ class ExecutorAllocationManagerSuite assert(numExecutorsToAdd(manager) === 1) // Verify that running a task doesn't affect the target - sc.listenerBus.postToAll(SparkListenerTaskStart(1, 0, createTaskInfo(0, 0, "executor-1"))) + sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, createTaskInfo(0, 0, "executor-1"))) assert(numExecutorsTarget(manager) === 5) assert(addExecutors(manager) === 0) assert(numExecutorsToAdd(manager) === 1) // Verify that running a speculative task doesn't affect the target - sc.listenerBus.postToAll(SparkListenerTaskStart(1, 0, createTaskInfo(0, 0, "executor-2", true))) + sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, createTaskInfo(0, 0, "executor-2", true))) assert(numExecutorsTarget(manager) === 5) assert(addExecutors(manager) === 0) assert(numExecutorsToAdd(manager) === 1) } + test("add executors capped by max concurrent tasks for a job group with single core executors") { + val conf = new SparkConf() + .setMaster("myDummyLocalExternalClusterManager") + .setAppName("test-executor-allocation-manager") + .set("spark.dynamicAllocation.enabled", "true") + .set("spark.dynamicAllocation.testing", "true") + .set("spark.job.group1.maxConcurrentTasks", "2") + .set("spark.job.group2.maxConcurrentTasks", "5") + val sc = new SparkContext(conf) + contexts += sc + sc.setJobGroup("group1", "", false) + + val manager = sc.executorAllocationManager.get + val stages = Seq(createStageInfo(0, 10), createStageInfo(1, 10)) + // Submit the job and stage start/submit events + sc.listenerBus.postToAll(SparkListenerJobStart(0, 0, stages, sc.getLocalProperties)) + sc.listenerBus.postToAll(SparkListenerStageSubmitted(stages(0))) + + // Verify that we're capped at number of max concurrent tasks in the stage + assert(maxNumExecutorsNeeded(manager) === 2) + + // Submit another stage in the same job + sc.listenerBus.postToAll(SparkListenerStageSubmitted(stages(1))) + assert(maxNumExecutorsNeeded(manager) === 2) + + sc.listenerBus.postToAll(SparkListenerStageCompleted(stages(0))) + sc.listenerBus.postToAll(SparkListenerStageCompleted(stages(1))) + sc.listenerBus.postToAll(SparkListenerJobEnd(0, 10, JobSucceeded)) + + // Submit a new job in the same job group + val stage2 = createStageInfo(2, 20) + sc.listenerBus.postToAll(SparkListenerJobStart(1, 0, Seq(stage2), sc.getLocalProperties)) + sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage2)) + assert(maxNumExecutorsNeeded(manager) === 2) + + sc.listenerBus.postToAll(SparkListenerStageCompleted(stage2)) + sc.listenerBus.postToAll(SparkListenerJobEnd(1, 10, JobSucceeded)) + + // Set another jobGroup + sc.setJobGroup("group2", "", false) + + val stage3 = createStageInfo(3, 20) + sc.listenerBus.postToAll(SparkListenerJobStart(2, 0, Seq(stage3), sc.getLocalProperties)) + sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage3)) + assert(maxNumExecutorsNeeded(manager) === 5) + + sc.listenerBus.postToAll(SparkListenerStageCompleted(stage3)) + sc.listenerBus.postToAll(SparkListenerJobEnd(2, 10, JobSucceeded)) + + // Clear jobGroup + sc.clearJobGroup() + + val stage4 = createStageInfo(4, 50) + sc.listenerBus.postToAll(SparkListenerJobStart(2, 0, Seq(stage4), sc.getLocalProperties)) + sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage4)) + assert(maxNumExecutorsNeeded(manager) === 50) + } + + test("add executors capped by max concurrent tasks for a job group with multi cores executors") { + val conf = new SparkConf() + .setMaster("myDummyLocalExternalClusterManager") + .setAppName("test-executor-allocation-manager") + .set("spark.dynamicAllocation.enabled", "true") + .set("spark.dynamicAllocation.testing", "true") + .set("spark.job.group1.maxConcurrentTasks", "2") + .set("spark.job.group2.maxConcurrentTasks", "5") + .set("spark.executor.cores", "3") + val sc = new SparkContext(conf) + contexts += sc + sc.setJobGroup("group1", "", false) + + val manager = sc.executorAllocationManager.get + val stages = Seq(createStageInfo(0, 10), createStageInfo(1, 10)) + // Submit the job and stage start/submit events + sc.listenerBus.postToAll(SparkListenerJobStart(0, 0, stages, sc.getLocalProperties)) + sc.listenerBus.postToAll(SparkListenerStageSubmitted(stages(0))) + + // Verify that we're capped at number of max concurrent tasks in the stage + assert(maxNumExecutorsNeeded(manager) === 1) + + // Submit another stage in the same job + sc.listenerBus.postToAll(SparkListenerStageSubmitted(stages(1))) + assert(maxNumExecutorsNeeded(manager) === 1) + + sc.listenerBus.postToAll(SparkListenerStageCompleted(stages(0))) + sc.listenerBus.postToAll(SparkListenerStageCompleted(stages(1))) + sc.listenerBus.postToAll(SparkListenerJobEnd(0, 10, JobSucceeded)) + + // Submit a new job in the same job group + val stage2 = createStageInfo(2, 20) + sc.listenerBus.postToAll(SparkListenerJobStart(1, 0, Seq(stage2), sc.getLocalProperties)) + sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage2)) + assert(maxNumExecutorsNeeded(manager) === 1) + + sc.listenerBus.postToAll(SparkListenerStageCompleted(stage2)) + sc.listenerBus.postToAll(SparkListenerJobEnd(1, 10, JobSucceeded)) + + // Set another jobGroup + sc.setJobGroup("group2", "", false) + + val stage3 = createStageInfo(3, 20) + sc.listenerBus.postToAll(SparkListenerJobStart(2, 0, Seq(stage3), sc.getLocalProperties)) + sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage3)) + assert(maxNumExecutorsNeeded(manager) === 2) + + sc.listenerBus.postToAll(SparkListenerStageCompleted(stage3)) + sc.listenerBus.postToAll(SparkListenerJobEnd(2, 10, JobSucceeded)) + + // Clear jobGroup + sc.clearJobGroup() + + val stage4 = createStageInfo(4, 50) + sc.listenerBus.postToAll(SparkListenerJobStart(2, 0, Seq(stage4), sc.getLocalProperties)) + sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage4)) + assert(maxNumExecutorsNeeded(manager) === 17) + } + + test("add executors capped by max concurrent tasks for concurrent job groups") { + val conf = new SparkConf() + .setMaster("myDummyLocalExternalClusterManager") + .setAppName("test-executor-allocation-manager") + .set("spark.dynamicAllocation.enabled", "true") + .set("spark.dynamicAllocation.testing", "true") + .set("spark.job.group1.maxConcurrentTasks", "5") + .set("spark.job.group2.maxConcurrentTasks", "11") + .set("spark.job.group3.maxConcurrentTasks", "17") + val sc = new SparkContext(conf) + contexts += sc + + val manager = sc.executorAllocationManager.get + + // Submit a job in group1 + sc.setJobGroup("group1", "", false) + val stages = Seq(createStageInfo(0, 2), createStageInfo(1, 10)) + // Submit the job and stage start/submit events + sc.listenerBus.postToAll(SparkListenerJobStart(0, 0, stages, sc.getLocalProperties)) + sc.listenerBus.postToAll(SparkListenerStageSubmitted(stages(0))) + + // Verify that we're capped at number of max concurrent tasks in the job group + assert(maxNumExecutorsNeeded(manager) === 2) + + // Submit another stage in the same job + sc.listenerBus.postToAll(SparkListenerStageSubmitted(stages(1))) + assert(maxNumExecutorsNeeded(manager) === 5) + + // Submit a job in group 2 + sc.setJobGroup("group2", "", false) + val stage2 = createStageInfo(2, 20) + sc.listenerBus.postToAll(SparkListenerJobStart(1, 0, Seq(stage2), sc.getLocalProperties)) + sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage2)) + assert(maxNumExecutorsNeeded(manager) === 16) // 5 + 11 + + // Submit a job in group 3 + sc.setJobGroup("group3", "", false) + val stage3 = createStageInfo(3, 50) + sc.listenerBus.postToAll(SparkListenerJobStart(2, 0, Seq(stage3), sc.getLocalProperties)) + sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage3)) + assert(maxNumExecutorsNeeded(manager) === 33) // 5 + 11 + 17 + + // Mark job in group 2 as complete + sc.listenerBus.postToAll(SparkListenerStageCompleted(stage2)) + sc.listenerBus.postToAll(SparkListenerJobEnd(1, 20, JobSucceeded)) + assert(maxNumExecutorsNeeded(manager) === 22) // 33 - 11 + + // Mark job in group 1 as complete + sc.listenerBus.postToAll(SparkListenerStageCompleted(stages(0))) + sc.listenerBus.postToAll(SparkListenerStageCompleted(stages(1))) + sc.listenerBus.postToAll(SparkListenerJobEnd(0, 10, JobSucceeded)) + assert(maxNumExecutorsNeeded(manager) === 17) // 22 - 5 + + // Submit a job without any job group + sc.clearJobGroup() + val stage4 = createStageInfo(4, 333) + sc.listenerBus.postToAll(SparkListenerJobStart(4, 0, Seq(stage4), sc.getLocalProperties)) + sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage4)) + assert(maxNumExecutorsNeeded(manager) === 350) // 17 + 333 + + // Mark job without job group as complete + sc.listenerBus.postToAll(SparkListenerStageCompleted(stage4)) + sc.listenerBus.postToAll(SparkListenerJobEnd(4, 20, JobSucceeded)) + assert(maxNumExecutorsNeeded(manager) === 17) // 350 - 333 + } + + test("add executors capped by max concurrent tasks for concurrent job groups with speculation") { + val conf = new SparkConf() + .setMaster("myDummyLocalExternalClusterManager") + .setAppName("test-executor-allocation-manager") + .set("spark.dynamicAllocation.enabled", "true") + .set("spark.dynamicAllocation.testing", "true") + .set("spark.job.group1.maxConcurrentTasks", "5") + .set("spark.job.group2.maxConcurrentTasks", "11") + .set("spark.job.group3.maxConcurrentTasks", "17") + val sc = new SparkContext(conf) + contexts += sc + + val manager = sc.executorAllocationManager.get + + // Submit a job in group1 + sc.setJobGroup("group1", "", false) + val stages = Seq(createStageInfo(0, 2), createStageInfo(1, 10)) + // Submit the job and stage start/submit events + sc.listenerBus.postToAll(SparkListenerJobStart(0, 0, stages, sc.getLocalProperties)) + sc.listenerBus.postToAll(SparkListenerStageSubmitted(stages(0))) + + // Verify that we're capped at number of max concurrent tasks in the job group + assert(maxNumExecutorsNeeded(manager) === 2) + + // submit a speculative task + sc.listenerBus.postToAll(SparkListenerSpeculativeTaskSubmitted(0)) + assert(maxNumExecutorsNeeded(manager) === 3) // should increase the no. + + // Submit another stage in the same job + sc.listenerBus.postToAll(SparkListenerStageSubmitted(stages(1))) + assert(maxNumExecutorsNeeded(manager) === 5) + + // Submit a job in group 2 + sc.setJobGroup("group2", "", false) + val stage2 = createStageInfo(2, 20) + sc.listenerBus.postToAll(SparkListenerJobStart(1, 0, Seq(stage2), sc.getLocalProperties)) + sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage2)) + assert(maxNumExecutorsNeeded(manager) === 16) // 5 + 11 + + // Submit a job in group 3 + sc.setJobGroup("group3", "", false) + val stage3 = createStageInfo(3, 50) + sc.listenerBus.postToAll(SparkListenerJobStart(2, 0, Seq(stage3), sc.getLocalProperties)) + sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage3)) + assert(maxNumExecutorsNeeded(manager) === 33) // 5 + 11 + 17 + + // Mark job in group 2 as complete + sc.listenerBus.postToAll(SparkListenerStageCompleted(stage2)) + sc.listenerBus.postToAll(SparkListenerJobEnd(1, 20, JobSucceeded)) + assert(maxNumExecutorsNeeded(manager) === 22) // 33 - 11 + + // Mark job in group 1 as complete + sc.listenerBus.postToAll(SparkListenerStageCompleted(stages(0))) + sc.listenerBus.postToAll(SparkListenerStageCompleted(stages(1))) + sc.listenerBus.postToAll(SparkListenerJobEnd(0, 10, JobSucceeded)) + assert(maxNumExecutorsNeeded(manager) === 17) // 22 - 5 + + // Submit a job without any job group + sc.clearJobGroup() + val stage4 = createStageInfo(4, 333) + sc.listenerBus.postToAll(SparkListenerJobStart(4, 0, Seq(stage4), sc.getLocalProperties)) + sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage4)) + assert(maxNumExecutorsNeeded(manager) === 350) // 17 + 333 + + // Submit a speculative task in unbounded job group + sc.listenerBus.postToAll(SparkListenerSpeculativeTaskSubmitted(4)) + sc.listenerBus.postToAll(SparkListenerSpeculativeTaskSubmitted(4)) + assert(maxNumExecutorsNeeded(manager) === 352) // should increase the no. + + // Mark job without job group as complete + sc.listenerBus.postToAll(SparkListenerStageCompleted(stage4)) + sc.listenerBus.postToAll(SparkListenerJobEnd(4, 20, JobSucceeded)) + assert(maxNumExecutorsNeeded(manager) === 17) // 350 - 333 --- End diff -- update comment, you had 352 tasks with the speculative. Can you perhaps finish a speculative here and have it go to 351 first.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org