Github user tgravescs commented on a diff in the pull request:
https://github.com/apache/spark/pull/19194#discussion_r139156014
--- Diff:
core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala ---
@@ -210,22 +216,282 @@ class ExecutorAllocationManagerSuite
assert(numExecutorsToAdd(manager) === 1)
// Verify that running a task doesn't affect the target
- sc.listenerBus.postToAll(SparkListenerTaskStart(1, 0,
createTaskInfo(0, 0, "executor-1")))
+ sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0,
createTaskInfo(0, 0, "executor-1")))
assert(numExecutorsTarget(manager) === 5)
assert(addExecutors(manager) === 0)
assert(numExecutorsToAdd(manager) === 1)
// Verify that running a speculative task doesn't affect the target
- sc.listenerBus.postToAll(SparkListenerTaskStart(1, 0,
createTaskInfo(0, 0, "executor-2", true)))
+ sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0,
createTaskInfo(0, 0, "executor-2", true)))
assert(numExecutorsTarget(manager) === 5)
assert(addExecutors(manager) === 0)
assert(numExecutorsToAdd(manager) === 1)
}
+ test("add executors capped by max concurrent tasks for a job group with
single core executors") {
+ val conf = new SparkConf()
+ .setMaster("myDummyLocalExternalClusterManager")
+ .setAppName("test-executor-allocation-manager")
+ .set("spark.dynamicAllocation.enabled", "true")
+ .set("spark.dynamicAllocation.testing", "true")
+ .set("spark.job.group1.maxConcurrentTasks", "2")
+ .set("spark.job.group2.maxConcurrentTasks", "5")
+ val sc = new SparkContext(conf)
+ contexts += sc
+ sc.setJobGroup("group1", "", false)
+
+ val manager = sc.executorAllocationManager.get
+ val stages = Seq(createStageInfo(0, 10), createStageInfo(1, 10))
+ // Submit the job and stage start/submit events
+ sc.listenerBus.postToAll(SparkListenerJobStart(0, 0, stages,
sc.getLocalProperties))
+ sc.listenerBus.postToAll(SparkListenerStageSubmitted(stages(0)))
+
+ // Verify that we're capped at number of max concurrent tasks in the
stage
+ assert(maxNumExecutorsNeeded(manager) === 2)
+
+ // Submit another stage in the same job
+ sc.listenerBus.postToAll(SparkListenerStageSubmitted(stages(1)))
+ assert(maxNumExecutorsNeeded(manager) === 2)
+
+ sc.listenerBus.postToAll(SparkListenerStageCompleted(stages(0)))
+ sc.listenerBus.postToAll(SparkListenerStageCompleted(stages(1)))
+ sc.listenerBus.postToAll(SparkListenerJobEnd(0, 10, JobSucceeded))
+
+ // Submit a new job in the same job group
+ val stage2 = createStageInfo(2, 20)
+ sc.listenerBus.postToAll(SparkListenerJobStart(1, 0, Seq(stage2),
sc.getLocalProperties))
+ sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage2))
+ assert(maxNumExecutorsNeeded(manager) === 2)
+
+ sc.listenerBus.postToAll(SparkListenerStageCompleted(stage2))
+ sc.listenerBus.postToAll(SparkListenerJobEnd(1, 10, JobSucceeded))
+
+ // Set another jobGroup
+ sc.setJobGroup("group2", "", false)
+
+ val stage3 = createStageInfo(3, 20)
+ sc.listenerBus.postToAll(SparkListenerJobStart(2, 0, Seq(stage3),
sc.getLocalProperties))
+ sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage3))
+ assert(maxNumExecutorsNeeded(manager) === 5)
+
+ sc.listenerBus.postToAll(SparkListenerStageCompleted(stage3))
+ sc.listenerBus.postToAll(SparkListenerJobEnd(2, 10, JobSucceeded))
+
+ // Clear jobGroup
+ sc.clearJobGroup()
+
+ val stage4 = createStageInfo(4, 50)
+ sc.listenerBus.postToAll(SparkListenerJobStart(2, 0, Seq(stage4),
sc.getLocalProperties))
+ sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage4))
+ assert(maxNumExecutorsNeeded(manager) === 50)
+ }
+
+ test("add executors capped by max concurrent tasks for a job group with
multi cores executors") {
+ val conf = new SparkConf()
+ .setMaster("myDummyLocalExternalClusterManager")
+ .setAppName("test-executor-allocation-manager")
+ .set("spark.dynamicAllocation.enabled", "true")
+ .set("spark.dynamicAllocation.testing", "true")
+ .set("spark.job.group1.maxConcurrentTasks", "2")
+ .set("spark.job.group2.maxConcurrentTasks", "5")
+ .set("spark.executor.cores", "3")
+ val sc = new SparkContext(conf)
+ contexts += sc
+ sc.setJobGroup("group1", "", false)
+
+ val manager = sc.executorAllocationManager.get
+ val stages = Seq(createStageInfo(0, 10), createStageInfo(1, 10))
+ // Submit the job and stage start/submit events
+ sc.listenerBus.postToAll(SparkListenerJobStart(0, 0, stages,
sc.getLocalProperties))
+ sc.listenerBus.postToAll(SparkListenerStageSubmitted(stages(0)))
+
+ // Verify that we're capped at number of max concurrent tasks in the
stage
+ assert(maxNumExecutorsNeeded(manager) === 1)
+
+ // Submit another stage in the same job
+ sc.listenerBus.postToAll(SparkListenerStageSubmitted(stages(1)))
+ assert(maxNumExecutorsNeeded(manager) === 1)
+
+ sc.listenerBus.postToAll(SparkListenerStageCompleted(stages(0)))
+ sc.listenerBus.postToAll(SparkListenerStageCompleted(stages(1)))
+ sc.listenerBus.postToAll(SparkListenerJobEnd(0, 10, JobSucceeded))
+
+ // Submit a new job in the same job group
+ val stage2 = createStageInfo(2, 20)
+ sc.listenerBus.postToAll(SparkListenerJobStart(1, 0, Seq(stage2),
sc.getLocalProperties))
+ sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage2))
+ assert(maxNumExecutorsNeeded(manager) === 1)
+
+ sc.listenerBus.postToAll(SparkListenerStageCompleted(stage2))
+ sc.listenerBus.postToAll(SparkListenerJobEnd(1, 10, JobSucceeded))
+
+ // Set another jobGroup
+ sc.setJobGroup("group2", "", false)
+
+ val stage3 = createStageInfo(3, 20)
+ sc.listenerBus.postToAll(SparkListenerJobStart(2, 0, Seq(stage3),
sc.getLocalProperties))
+ sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage3))
+ assert(maxNumExecutorsNeeded(manager) === 2)
+
+ sc.listenerBus.postToAll(SparkListenerStageCompleted(stage3))
+ sc.listenerBus.postToAll(SparkListenerJobEnd(2, 10, JobSucceeded))
+
+ // Clear jobGroup
+ sc.clearJobGroup()
+
+ val stage4 = createStageInfo(4, 50)
+ sc.listenerBus.postToAll(SparkListenerJobStart(2, 0, Seq(stage4),
sc.getLocalProperties))
+ sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage4))
+ assert(maxNumExecutorsNeeded(manager) === 17)
+ }
+
+ test("add executors capped by max concurrent tasks for concurrent job
groups") {
+ val conf = new SparkConf()
+ .setMaster("myDummyLocalExternalClusterManager")
+ .setAppName("test-executor-allocation-manager")
+ .set("spark.dynamicAllocation.enabled", "true")
+ .set("spark.dynamicAllocation.testing", "true")
+ .set("spark.job.group1.maxConcurrentTasks", "5")
+ .set("spark.job.group2.maxConcurrentTasks", "11")
+ .set("spark.job.group3.maxConcurrentTasks", "17")
+ val sc = new SparkContext(conf)
+ contexts += sc
+
+ val manager = sc.executorAllocationManager.get
+
+ // Submit a job in group1
+ sc.setJobGroup("group1", "", false)
+ val stages = Seq(createStageInfo(0, 2), createStageInfo(1, 10))
+ // Submit the job and stage start/submit events
+ sc.listenerBus.postToAll(SparkListenerJobStart(0, 0, stages,
sc.getLocalProperties))
+ sc.listenerBus.postToAll(SparkListenerStageSubmitted(stages(0)))
+
+ // Verify that we're capped at number of max concurrent tasks in the
job group
+ assert(maxNumExecutorsNeeded(manager) === 2)
+
+ // Submit another stage in the same job
+ sc.listenerBus.postToAll(SparkListenerStageSubmitted(stages(1)))
+ assert(maxNumExecutorsNeeded(manager) === 5)
+
+ // Submit a job in group 2
+ sc.setJobGroup("group2", "", false)
+ val stage2 = createStageInfo(2, 20)
+ sc.listenerBus.postToAll(SparkListenerJobStart(1, 0, Seq(stage2),
sc.getLocalProperties))
+ sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage2))
+ assert(maxNumExecutorsNeeded(manager) === 16) // 5 + 11
+
+ // Submit a job in group 3
+ sc.setJobGroup("group3", "", false)
+ val stage3 = createStageInfo(3, 50)
+ sc.listenerBus.postToAll(SparkListenerJobStart(2, 0, Seq(stage3),
sc.getLocalProperties))
+ sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage3))
+ assert(maxNumExecutorsNeeded(manager) === 33) // 5 + 11 + 17
+
+ // Mark job in group 2 as complete
+ sc.listenerBus.postToAll(SparkListenerStageCompleted(stage2))
+ sc.listenerBus.postToAll(SparkListenerJobEnd(1, 20, JobSucceeded))
+ assert(maxNumExecutorsNeeded(manager) === 22) // 33 - 11
+
+ // Mark job in group 1 as complete
+ sc.listenerBus.postToAll(SparkListenerStageCompleted(stages(0)))
+ sc.listenerBus.postToAll(SparkListenerStageCompleted(stages(1)))
+ sc.listenerBus.postToAll(SparkListenerJobEnd(0, 10, JobSucceeded))
+ assert(maxNumExecutorsNeeded(manager) === 17) // 22 - 5
+
+ // Submit a job without any job group
+ sc.clearJobGroup()
+ val stage4 = createStageInfo(4, 333)
+ sc.listenerBus.postToAll(SparkListenerJobStart(4, 0, Seq(stage4),
sc.getLocalProperties))
+ sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage4))
+ assert(maxNumExecutorsNeeded(manager) === 350) // 17 + 333
+
+ // Mark job without job group as complete
+ sc.listenerBus.postToAll(SparkListenerStageCompleted(stage4))
+ sc.listenerBus.postToAll(SparkListenerJobEnd(4, 20, JobSucceeded))
+ assert(maxNumExecutorsNeeded(manager) === 17) // 350 - 333
+ }
+
+ test("add executors capped by max concurrent tasks for concurrent job
groups with speculation") {
+ val conf = new SparkConf()
+ .setMaster("myDummyLocalExternalClusterManager")
+ .setAppName("test-executor-allocation-manager")
+ .set("spark.dynamicAllocation.enabled", "true")
+ .set("spark.dynamicAllocation.testing", "true")
+ .set("spark.job.group1.maxConcurrentTasks", "5")
+ .set("spark.job.group2.maxConcurrentTasks", "11")
+ .set("spark.job.group3.maxConcurrentTasks", "17")
+ val sc = new SparkContext(conf)
+ contexts += sc
+
+ val manager = sc.executorAllocationManager.get
+
+ // Submit a job in group1
+ sc.setJobGroup("group1", "", false)
+ val stages = Seq(createStageInfo(0, 2), createStageInfo(1, 10))
+ // Submit the job and stage start/submit events
+ sc.listenerBus.postToAll(SparkListenerJobStart(0, 0, stages,
sc.getLocalProperties))
+ sc.listenerBus.postToAll(SparkListenerStageSubmitted(stages(0)))
+
+ // Verify that we're capped at number of max concurrent tasks in the
job group
+ assert(maxNumExecutorsNeeded(manager) === 2)
+
+ // submit a speculative task
+ sc.listenerBus.postToAll(SparkListenerSpeculativeTaskSubmitted(0))
+ assert(maxNumExecutorsNeeded(manager) === 3) // should increase the no.
+
+ // Submit another stage in the same job
+ sc.listenerBus.postToAll(SparkListenerStageSubmitted(stages(1)))
+ assert(maxNumExecutorsNeeded(manager) === 5)
+
+ // Submit a job in group 2
+ sc.setJobGroup("group2", "", false)
+ val stage2 = createStageInfo(2, 20)
+ sc.listenerBus.postToAll(SparkListenerJobStart(1, 0, Seq(stage2),
sc.getLocalProperties))
+ sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage2))
+ assert(maxNumExecutorsNeeded(manager) === 16) // 5 + 11
+
+ // Submit a job in group 3
+ sc.setJobGroup("group3", "", false)
+ val stage3 = createStageInfo(3, 50)
+ sc.listenerBus.postToAll(SparkListenerJobStart(2, 0, Seq(stage3),
sc.getLocalProperties))
+ sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage3))
+ assert(maxNumExecutorsNeeded(manager) === 33) // 5 + 11 + 17
+
--- End diff --
send another speculative here, otherwise this test is same as the above one.
Have the speculative task finish from above.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]