Github user mengxr commented on a diff in the pull request:
https://github.com/apache/spark/pull/22001#discussion_r209294652
--- Diff:
core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
---
@@ -38,4 +46,83 @@ class CoarseGrainedSchedulerBackendSuite extends
SparkFunSuite with LocalSparkCo
assert(smaller.size === 4)
}
+ test("compute max number of concurrent tasks can be launched") {
+ val conf = new SparkConf()
+ .setMaster("local-cluster[4, 3, 1024]")
+ .setAppName("test")
+ sc = new SparkContext(conf)
+ eventually(timeout(10.seconds)) {
+ // Ensure all executors have been launched.
+ assert(sc.getExecutorIds().length == 4)
+ }
+ assert(sc.maxNumConcurrentTasks() == 12)
+ }
+
+ test("compute max number of concurrent tasks can be launched when
spark.task.cpus > 1") {
+ val conf = new SparkConf()
+ .set("spark.task.cpus", "2")
+ .setMaster("local-cluster[4, 3, 1024]")
+ .setAppName("test")
+ sc = new SparkContext(conf)
+ eventually(timeout(10.seconds)) {
+ // Ensure all executors have been launched.
+ assert(sc.getExecutorIds().length == 4)
+ }
+ // Each executor can only launch one task since `spark.task.cpus` is 2.
+ assert(sc.maxNumConcurrentTasks() == 4)
+ }
+
+ test("compute max number of concurrent tasks can be launched when some
executors are busy") {
+ val conf = new SparkConf()
+ .set("spark.task.cpus", "2")
+ .setMaster("local-cluster[4, 3, 1024]")
+ .setAppName("test")
+ sc = new SparkContext(conf)
+ val rdd = sc.parallelize(1 to 10, 4).mapPartitions { iter =>
+ Thread.sleep(1000)
+ iter
+ }
+ var taskStarted = new AtomicBoolean(false)
+ var taskEnded = new AtomicBoolean(false)
+ val listener = new SparkListener() {
+ override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
+ taskStarted.set(true)
+ }
+
+ override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
+ taskEnded.set(true)
+ }
+ }
+
+ try {
+ sc.addSparkListener(listener)
+ eventually(timeout(10.seconds)) {
+ // Ensure all executors have been launched.
+ assert(sc.getExecutorIds().length == 4)
+ }
+
+ // Submit a job to trigger some tasks on active executors.
+ testSubmitJob(sc, rdd)
+
+ eventually(timeout(5.seconds)) {
--- End diff --
Maybe safer to let the task sleep longer and cancel the task one the
conditions are met.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]