Github user dhruve commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19194#discussion_r140122744
  
    --- Diff: 
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala ---
    @@ -1255,6 +1255,97 @@ class TaskSetManagerSuite extends SparkFunSuite with 
LocalSparkContext with Logg
         assert(taskOption4.get.addedJars === addedJarsMidTaskSet)
       }
     
    +  test("limit max concurrent running tasks in a job group when configured 
") {
    +    val conf = new SparkConf().
    +      set(config.BLACKLIST_ENABLED, true).
    +      set("spark.job.testJobGroup.maxConcurrentTasks", "2") // Limit max 
concurrent tasks to 2
    +
    +    sc = new SparkContext("local", "test", conf)
    +    sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", 
"host2"))
    +    val props = new Properties();
    +    props.setProperty(SparkContext.SPARK_JOB_GROUP_ID, "testJobGroup") // 
set the job group
    +
    +    val tasks = Array.tabulate[Task[_]](10) { i =>
    +      new FakeTask(0, i, Nil)
    +    }
    +    val tsm = new TaskSetManager(sched, new TaskSet(tasks, 0, 0, 0, 
props), 2)
    +
    +    // make some offers to our taskset
    +    var taskDescs = Seq(
    +      "exec1" -> "host1",
    +      "exec2" -> "host1"
    +    ).flatMap { case (exec, host) =>
    +      // offer each executor twice (simulating 2 cores per executor)
    +      (0 until 2).flatMap{ _ => tsm.resourceOffer(exec, host, 
TaskLocality.ANY)}
    +    }
    +    assert(taskDescs.size === 2) // Out of the 4 offers, tsm can accept up 
to maxConcurrentTasks.
    +
    +    // make 4 more offers
    +    val taskDescs2 = Seq(
    +      "exec1" -> "host1",
    +      "exec2" -> "host1"
    +    ).flatMap { case (exec, host) =>
    +      (0 until 2).flatMap{ _ => tsm.resourceOffer(exec, host, 
TaskLocality.ANY)}
    +    }
    +    assert(taskDescs2.size === 0) // tsm doesn't accept any as it is 
already running at max tasks
    +
    +    // inform tsm that one task has completed
    +    val directTaskResult = createTaskResult(0)
    +    tsm.handleSuccessfulTask(taskDescs(0).taskId, directTaskResult)
    +
    +    // make 4 more offers after previous task completed
    +    taskDescs = Seq(
    +      "exec1" -> "host1",
    +      "exec2" -> "host1"
    +    ).flatMap { case (exec, host) =>
    +      (0 until 2).flatMap{ _ => tsm.resourceOffer(exec, host, 
TaskLocality.ANY)}
    +    }
    +    assert(taskDescs.size === 1) // tsm accepts one as it can run one more 
task
    +  }
    +
    +  test("do not limit max concurrent running tasks in a job group by 
default") {
    --- End diff --
    
    The previous test covers all the necessary checks introduced by the change. 
I added this to cover the default scenario when no job group is specified. Can 
do away with this.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to