Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22001#discussion_r209294652
  
    --- Diff: 
core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
 ---
    @@ -38,4 +46,83 @@ class CoarseGrainedSchedulerBackendSuite extends 
SparkFunSuite with LocalSparkCo
         assert(smaller.size === 4)
       }
     
    +  test("compute max number of concurrent tasks can be launched") {
    +    val conf = new SparkConf()
    +      .setMaster("local-cluster[4, 3, 1024]")
    +      .setAppName("test")
    +    sc = new SparkContext(conf)
    +    eventually(timeout(10.seconds)) {
    +      // Ensure all executors have been launched.
    +      assert(sc.getExecutorIds().length == 4)
    +    }
    +    assert(sc.maxNumConcurrentTasks() == 12)
    +  }
    +
    +  test("compute max number of concurrent tasks can be launched when 
spark.task.cpus > 1") {
    +    val conf = new SparkConf()
    +      .set("spark.task.cpus", "2")
    +      .setMaster("local-cluster[4, 3, 1024]")
    +      .setAppName("test")
    +    sc = new SparkContext(conf)
    +    eventually(timeout(10.seconds)) {
    +      // Ensure all executors have been launched.
    +      assert(sc.getExecutorIds().length == 4)
    +    }
    +    // Each executor can only launch one task since `spark.task.cpus` is 2.
    +    assert(sc.maxNumConcurrentTasks() == 4)
    +  }
    +
    +  test("compute max number of concurrent tasks can be launched when some 
executors are busy") {
    +    val conf = new SparkConf()
    +      .set("spark.task.cpus", "2")
    +      .setMaster("local-cluster[4, 3, 1024]")
    +      .setAppName("test")
    +    sc = new SparkContext(conf)
    +    val rdd = sc.parallelize(1 to 10, 4).mapPartitions { iter =>
    +      Thread.sleep(1000)
    +      iter
    +    }
    +    var taskStarted = new AtomicBoolean(false)
    +    var taskEnded = new AtomicBoolean(false)
    +    val listener = new SparkListener() {
    +      override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
    +        taskStarted.set(true)
    +      }
    +
    +      override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
    +        taskEnded.set(true)
    +      }
    +    }
    +
    +    try {
    +      sc.addSparkListener(listener)
    +      eventually(timeout(10.seconds)) {
    +        // Ensure all executors have been launched.
    +        assert(sc.getExecutorIds().length == 4)
    +      }
    +
    +      // Submit a job to trigger some tasks on active executors.
    +      testSubmitJob(sc, rdd)
    +
    +      eventually(timeout(5.seconds)) {
    --- End diff --
    
    Maybe safer to let the task sleep longer and cancel the task one the 
conditions are met.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to