Github user squito commented on a diff in the pull request:
https://github.com/apache/spark/pull/19194#discussion_r139819360
--- Diff:
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala ---
@@ -1255,6 +1255,97 @@ class TaskSetManagerSuite extends SparkFunSuite with
LocalSparkContext with Logg
assert(taskOption4.get.addedJars === addedJarsMidTaskSet)
}
+ test("limit max concurrent running tasks in a job group when configured
") {
+ val conf = new SparkConf().
+ set(config.BLACKLIST_ENABLED, true).
+ set("spark.job.testJobGroup.maxConcurrentTasks", "2") // Limit max
concurrent tasks to 2
+
+ sc = new SparkContext("local", "test", conf)
+ sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2",
"host2"))
+ val props = new Properties();
+ props.setProperty(SparkContext.SPARK_JOB_GROUP_ID, "testJobGroup") //
set the job group
+
+ val tasks = Array.tabulate[Task[_]](10) { i =>
+ new FakeTask(0, i, Nil)
+ }
+ val tsm = new TaskSetManager(sched, new TaskSet(tasks, 0, 0, 0,
props), 2)
+
+ // make some offers to our taskset
+ var taskDescs = Seq(
+ "exec1" -> "host1",
+ "exec2" -> "host1"
+ ).flatMap { case (exec, host) =>
+ // offer each executor twice (simulating 2 cores per executor)
+ (0 until 2).flatMap{ _ => tsm.resourceOffer(exec, host,
TaskLocality.ANY)}
+ }
+ assert(taskDescs.size === 2) // Out of the 4 offers, tsm can accept up
to maxConcurrentTasks.
+
+ // make 4 more offers
+ val taskDescs2 = Seq(
+ "exec1" -> "host1",
+ "exec2" -> "host1"
+ ).flatMap { case (exec, host) =>
+ (0 until 2).flatMap{ _ => tsm.resourceOffer(exec, host,
TaskLocality.ANY)}
+ }
+ assert(taskDescs2.size === 0) // tsm doesn't accept any as it is
already running at max tasks
+
+ // inform tsm that one task has completed
+ val directTaskResult = createTaskResult(0)
+ tsm.handleSuccessfulTask(taskDescs(0).taskId, directTaskResult)
+
+ // make 4 more offers after previous task completed
+ taskDescs = Seq(
+ "exec1" -> "host1",
+ "exec2" -> "host1"
+ ).flatMap { case (exec, host) =>
+ (0 until 2).flatMap{ _ => tsm.resourceOffer(exec, host,
TaskLocality.ANY)}
+ }
+ assert(taskDescs.size === 1) // tsm accepts one as it can run one more
task
+ }
+
+ test("do not limit max concurrent running tasks in a job group by
default") {
+ val conf = new SparkConf().
+ set(config.BLACKLIST_ENABLED, true)
+
+ sc = new SparkContext("local", "test", conf)
+ sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2",
"host2"))
+
+ val tasks = Array.tabulate[Task[_]](10) { i =>
+ new FakeTask(0, i, Nil)
+ }
+ val tsm = new TaskSetManager(sched, new TaskSet(tasks, 0, 0, 0, null),
2)
+
+ // make 5 offers to our taskset
+ var taskDescs = Seq(
+ "exec1" -> "host1",
+ "exec2" -> "host2"
+ ).flatMap { case (exec, host) =>
+ // offer each executor twice (simulating 3 cores per executor)
--- End diff --
update comments
5 offers -> 6 offers
twice -> three times
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]