Github user tgravescs commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19194#discussion_r139156205
  
    --- Diff: 
core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala ---
    @@ -210,22 +216,282 @@ class ExecutorAllocationManagerSuite
         assert(numExecutorsToAdd(manager) === 1)
     
         // Verify that running a task doesn't affect the target
    -    sc.listenerBus.postToAll(SparkListenerTaskStart(1, 0, 
createTaskInfo(0, 0, "executor-1")))
    +    sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, 
createTaskInfo(0, 0, "executor-1")))
         assert(numExecutorsTarget(manager) === 5)
         assert(addExecutors(manager) === 0)
         assert(numExecutorsToAdd(manager) === 1)
     
         // Verify that running a speculative task doesn't affect the target
    -    sc.listenerBus.postToAll(SparkListenerTaskStart(1, 0, 
createTaskInfo(0, 0, "executor-2", true)))
    +    sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, 
createTaskInfo(0, 0, "executor-2", true)))
         assert(numExecutorsTarget(manager) === 5)
         assert(addExecutors(manager) === 0)
         assert(numExecutorsToAdd(manager) === 1)
       }
     
    +  test("add executors capped by max concurrent tasks for a job group with 
single core executors") {
    +    val conf = new SparkConf()
    +      .setMaster("myDummyLocalExternalClusterManager")
    +      .setAppName("test-executor-allocation-manager")
    +      .set("spark.dynamicAllocation.enabled", "true")
    +      .set("spark.dynamicAllocation.testing", "true")
    +      .set("spark.job.group1.maxConcurrentTasks", "2")
    +      .set("spark.job.group2.maxConcurrentTasks", "5")
    +    val sc = new SparkContext(conf)
    +    contexts += sc
    +    sc.setJobGroup("group1", "", false)
    +
    +    val manager = sc.executorAllocationManager.get
    +    val stages = Seq(createStageInfo(0, 10), createStageInfo(1, 10))
    +    // Submit the job and stage start/submit events
    +    sc.listenerBus.postToAll(SparkListenerJobStart(0, 0, stages, 
sc.getLocalProperties))
    +    sc.listenerBus.postToAll(SparkListenerStageSubmitted(stages(0)))
    +
    +    // Verify that we're capped at number of max concurrent tasks in the 
stage
    +    assert(maxNumExecutorsNeeded(manager) === 2)
    +
    +    // Submit another stage in the same job
    +    sc.listenerBus.postToAll(SparkListenerStageSubmitted(stages(1)))
    +    assert(maxNumExecutorsNeeded(manager) === 2)
    +
    +    sc.listenerBus.postToAll(SparkListenerStageCompleted(stages(0)))
    +    sc.listenerBus.postToAll(SparkListenerStageCompleted(stages(1)))
    +    sc.listenerBus.postToAll(SparkListenerJobEnd(0, 10, JobSucceeded))
    +
    +    // Submit a new job in the same job group
    +    val stage2 = createStageInfo(2, 20)
    +    sc.listenerBus.postToAll(SparkListenerJobStart(1, 0, Seq(stage2), 
sc.getLocalProperties))
    +    sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage2))
    +    assert(maxNumExecutorsNeeded(manager) === 2)
    +
    +    sc.listenerBus.postToAll(SparkListenerStageCompleted(stage2))
    +    sc.listenerBus.postToAll(SparkListenerJobEnd(1, 10, JobSucceeded))
    +
    +    // Set another jobGroup
    +    sc.setJobGroup("group2", "", false)
    +
    +    val stage3 = createStageInfo(3, 20)
    +    sc.listenerBus.postToAll(SparkListenerJobStart(2, 0, Seq(stage3), 
sc.getLocalProperties))
    +    sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage3))
    +    assert(maxNumExecutorsNeeded(manager) === 5)
    +
    +    sc.listenerBus.postToAll(SparkListenerStageCompleted(stage3))
    +    sc.listenerBus.postToAll(SparkListenerJobEnd(2, 10, JobSucceeded))
    +
    +    // Clear jobGroup
    +    sc.clearJobGroup()
    +
    +    val stage4 = createStageInfo(4, 50)
    +    sc.listenerBus.postToAll(SparkListenerJobStart(2, 0, Seq(stage4), 
sc.getLocalProperties))
    +    sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage4))
    +    assert(maxNumExecutorsNeeded(manager) === 50)
    +  }
    +
    +  test("add executors capped by max concurrent tasks for a job group with 
multi cores executors") {
    +    val conf = new SparkConf()
    +      .setMaster("myDummyLocalExternalClusterManager")
    +      .setAppName("test-executor-allocation-manager")
    +      .set("spark.dynamicAllocation.enabled", "true")
    +      .set("spark.dynamicAllocation.testing", "true")
    +      .set("spark.job.group1.maxConcurrentTasks", "2")
    +      .set("spark.job.group2.maxConcurrentTasks", "5")
    +      .set("spark.executor.cores", "3")
    +    val sc = new SparkContext(conf)
    +    contexts += sc
    +    sc.setJobGroup("group1", "", false)
    +
    +    val manager = sc.executorAllocationManager.get
    +    val stages = Seq(createStageInfo(0, 10), createStageInfo(1, 10))
    +    // Submit the job and stage start/submit events
    +    sc.listenerBus.postToAll(SparkListenerJobStart(0, 0, stages, 
sc.getLocalProperties))
    +    sc.listenerBus.postToAll(SparkListenerStageSubmitted(stages(0)))
    +
    +    // Verify that we're capped at number of max concurrent tasks in the 
stage
    +    assert(maxNumExecutorsNeeded(manager) === 1)
    +
    +    // Submit another stage in the same job
    +    sc.listenerBus.postToAll(SparkListenerStageSubmitted(stages(1)))
    +    assert(maxNumExecutorsNeeded(manager) === 1)
    +
    +    sc.listenerBus.postToAll(SparkListenerStageCompleted(stages(0)))
    +    sc.listenerBus.postToAll(SparkListenerStageCompleted(stages(1)))
    +    sc.listenerBus.postToAll(SparkListenerJobEnd(0, 10, JobSucceeded))
    +
    +    // Submit a new job in the same job group
    +    val stage2 = createStageInfo(2, 20)
    +    sc.listenerBus.postToAll(SparkListenerJobStart(1, 0, Seq(stage2), 
sc.getLocalProperties))
    +    sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage2))
    +    assert(maxNumExecutorsNeeded(manager) === 1)
    +
    +    sc.listenerBus.postToAll(SparkListenerStageCompleted(stage2))
    +    sc.listenerBus.postToAll(SparkListenerJobEnd(1, 10, JobSucceeded))
    +
    +    // Set another jobGroup
    +    sc.setJobGroup("group2", "", false)
    +
    +    val stage3 = createStageInfo(3, 20)
    +    sc.listenerBus.postToAll(SparkListenerJobStart(2, 0, Seq(stage3), 
sc.getLocalProperties))
    +    sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage3))
    +    assert(maxNumExecutorsNeeded(manager) === 2)
    +
    +    sc.listenerBus.postToAll(SparkListenerStageCompleted(stage3))
    +    sc.listenerBus.postToAll(SparkListenerJobEnd(2, 10, JobSucceeded))
    +
    +    // Clear jobGroup
    +    sc.clearJobGroup()
    +
    +    val stage4 = createStageInfo(4, 50)
    +    sc.listenerBus.postToAll(SparkListenerJobStart(2, 0, Seq(stage4), 
sc.getLocalProperties))
    +    sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage4))
    +    assert(maxNumExecutorsNeeded(manager) === 17)
    +  }
    +
    +  test("add executors capped by max concurrent tasks for concurrent job 
groups") {
    +    val conf = new SparkConf()
    +      .setMaster("myDummyLocalExternalClusterManager")
    +      .setAppName("test-executor-allocation-manager")
    +      .set("spark.dynamicAllocation.enabled", "true")
    +      .set("spark.dynamicAllocation.testing", "true")
    +      .set("spark.job.group1.maxConcurrentTasks", "5")
    +      .set("spark.job.group2.maxConcurrentTasks", "11")
    +      .set("spark.job.group3.maxConcurrentTasks", "17")
    +    val sc = new SparkContext(conf)
    +    contexts += sc
    +
    +    val manager = sc.executorAllocationManager.get
    +
    +    // Submit a job in group1
    +    sc.setJobGroup("group1", "", false)
    +    val stages = Seq(createStageInfo(0, 2), createStageInfo(1, 10))
    +    // Submit the job and stage start/submit events
    +    sc.listenerBus.postToAll(SparkListenerJobStart(0, 0, stages, 
sc.getLocalProperties))
    +    sc.listenerBus.postToAll(SparkListenerStageSubmitted(stages(0)))
    +
    +    // Verify that we're capped at number of max concurrent tasks in the 
job group
    +    assert(maxNumExecutorsNeeded(manager) === 2)
    +
    +    // Submit another stage in the same job
    +    sc.listenerBus.postToAll(SparkListenerStageSubmitted(stages(1)))
    +    assert(maxNumExecutorsNeeded(manager) === 5)
    +
    +    // Submit a job in group 2
    +    sc.setJobGroup("group2", "", false)
    +    val stage2 = createStageInfo(2, 20)
    +    sc.listenerBus.postToAll(SparkListenerJobStart(1, 0, Seq(stage2), 
sc.getLocalProperties))
    +    sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage2))
    +    assert(maxNumExecutorsNeeded(manager) === 16) // 5 + 11
    +
    +    // Submit a job in group 3
    +    sc.setJobGroup("group3", "", false)
    +    val stage3 = createStageInfo(3, 50)
    +    sc.listenerBus.postToAll(SparkListenerJobStart(2, 0, Seq(stage3), 
sc.getLocalProperties))
    +    sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage3))
    +    assert(maxNumExecutorsNeeded(manager) === 33) // 5 + 11 + 17
    +
    +    // Mark job in group 2 as complete
    +    sc.listenerBus.postToAll(SparkListenerStageCompleted(stage2))
    +    sc.listenerBus.postToAll(SparkListenerJobEnd(1, 20, JobSucceeded))
    +    assert(maxNumExecutorsNeeded(manager) === 22) // 33 - 11
    +
    +    // Mark job in group 1 as complete
    +    sc.listenerBus.postToAll(SparkListenerStageCompleted(stages(0)))
    +    sc.listenerBus.postToAll(SparkListenerStageCompleted(stages(1)))
    +    sc.listenerBus.postToAll(SparkListenerJobEnd(0, 10, JobSucceeded))
    +    assert(maxNumExecutorsNeeded(manager) === 17) // 22 - 5
    +
    +    // Submit a job without any job group
    +    sc.clearJobGroup()
    +    val stage4 = createStageInfo(4, 333)
    +    sc.listenerBus.postToAll(SparkListenerJobStart(4, 0, Seq(stage4), 
sc.getLocalProperties))
    +    sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage4))
    +    assert(maxNumExecutorsNeeded(manager) === 350) // 17 + 333
    +
    +    // Mark job without job group as complete
    +    sc.listenerBus.postToAll(SparkListenerStageCompleted(stage4))
    +    sc.listenerBus.postToAll(SparkListenerJobEnd(4, 20, JobSucceeded))
    +    assert(maxNumExecutorsNeeded(manager) === 17) // 350 - 333
    +  }
    +
    +  test("add executors capped by max concurrent tasks for concurrent job 
groups with speculation") {
    +    val conf = new SparkConf()
    +      .setMaster("myDummyLocalExternalClusterManager")
    +      .setAppName("test-executor-allocation-manager")
    +      .set("spark.dynamicAllocation.enabled", "true")
    +      .set("spark.dynamicAllocation.testing", "true")
    +      .set("spark.job.group1.maxConcurrentTasks", "5")
    +      .set("spark.job.group2.maxConcurrentTasks", "11")
    +      .set("spark.job.group3.maxConcurrentTasks", "17")
    +    val sc = new SparkContext(conf)
    +    contexts += sc
    +
    +    val manager = sc.executorAllocationManager.get
    +
    +    // Submit a job in group1
    +    sc.setJobGroup("group1", "", false)
    +    val stages = Seq(createStageInfo(0, 2), createStageInfo(1, 10))
    +    // Submit the job and stage start/submit events
    +    sc.listenerBus.postToAll(SparkListenerJobStart(0, 0, stages, 
sc.getLocalProperties))
    +    sc.listenerBus.postToAll(SparkListenerStageSubmitted(stages(0)))
    +
    +    // Verify that we're capped at number of max concurrent tasks in the 
job group
    +    assert(maxNumExecutorsNeeded(manager) === 2)
    +
    +    // submit a speculative task
    +    sc.listenerBus.postToAll(SparkListenerSpeculativeTaskSubmitted(0))
    +    assert(maxNumExecutorsNeeded(manager) === 3) // should increase the no.
    +
    +    // Submit another stage in the same job
    +    sc.listenerBus.postToAll(SparkListenerStageSubmitted(stages(1)))
    +    assert(maxNumExecutorsNeeded(manager) === 5)
    +
    +    // Submit a job in group 2
    +    sc.setJobGroup("group2", "", false)
    +    val stage2 = createStageInfo(2, 20)
    +    sc.listenerBus.postToAll(SparkListenerJobStart(1, 0, Seq(stage2), 
sc.getLocalProperties))
    +    sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage2))
    +    assert(maxNumExecutorsNeeded(manager) === 16) // 5 + 11
    +
    +    // Submit a job in group 3
    +    sc.setJobGroup("group3", "", false)
    +    val stage3 = createStageInfo(3, 50)
    +    sc.listenerBus.postToAll(SparkListenerJobStart(2, 0, Seq(stage3), 
sc.getLocalProperties))
    +    sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage3))
    +    assert(maxNumExecutorsNeeded(manager) === 33) // 5 + 11 + 17
    +
    +    // Mark job in group 2 as complete
    +    sc.listenerBus.postToAll(SparkListenerStageCompleted(stage2))
    +    sc.listenerBus.postToAll(SparkListenerJobEnd(1, 20, JobSucceeded))
    +    assert(maxNumExecutorsNeeded(manager) === 22) // 33 - 11
    +
    +    // Mark job in group 1 as complete
    +    sc.listenerBus.postToAll(SparkListenerStageCompleted(stages(0)))
    +    sc.listenerBus.postToAll(SparkListenerStageCompleted(stages(1)))
    +    sc.listenerBus.postToAll(SparkListenerJobEnd(0, 10, JobSucceeded))
    +    assert(maxNumExecutorsNeeded(manager) === 17) // 22 - 5
    +
    +    // Submit a job without any job group
    +    sc.clearJobGroup()
    +    val stage4 = createStageInfo(4, 333)
    +    sc.listenerBus.postToAll(SparkListenerJobStart(4, 0, Seq(stage4), 
sc.getLocalProperties))
    +    sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage4))
    +    assert(maxNumExecutorsNeeded(manager) === 350) // 17 + 333
    +
    +    // Submit a speculative task in unbounded job group
    +    sc.listenerBus.postToAll(SparkListenerSpeculativeTaskSubmitted(4))
    +    sc.listenerBus.postToAll(SparkListenerSpeculativeTaskSubmitted(4))
    +    assert(maxNumExecutorsNeeded(manager) === 352) // should increase the 
no.
    +
    +    // Mark job without job group as complete
    +    sc.listenerBus.postToAll(SparkListenerStageCompleted(stage4))
    +    sc.listenerBus.postToAll(SparkListenerJobEnd(4, 20, JobSucceeded))
    +    assert(maxNumExecutorsNeeded(manager) === 17) // 350 - 333
    --- End diff --
    
    update comment, you had 352 tasks with the speculative.
    
    Can you perhaps finish a speculative here and have it go to 351 first.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to