wbo4958 commented on code in PR #43494:
URL: https://github.com/apache/spark/pull/43494#discussion_r1503476690
##########
core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala:
##########
@@ -2283,4 +2295,425 @@ class TaskSchedulerImplSuite extends SparkFunSuite with
LocalSparkContext
taskScheduler.handleFailedTask(tsm, tid, state, reason)
}
+ private implicit def toInternalResource(resources: Map[String, Double]):
Map[String, Long] =
+ resources.map { case (k, v) => k ->
ResourceAmountUtils.toInternalResource(v) }
+
+ // 1 executor with 4 GPUS
+ Seq(true, false).foreach { barrierMode =>
+ val barrier = if (barrierMode) "barrier" else ""
+ (1 to 20).foreach { taskNum =>
+ val gpuTaskAmount =
ResourceAmountUtils.toFractionalResource(ONE_ENTIRE_RESOURCE / taskNum)
+ test(s"SPARK-45527 default rp with task.gpu.amount=${gpuTaskAmount} can
" +
+ s"restrict $taskNum $barrier tasks run in the same executor") {
+ val taskCpus = 1
+ val executorCpus = 100 // cpu will not limit the concurrent tasks
number
+ val executorGpus = 1
+
+ val taskScheduler = setupScheduler(numCores = executorCpus,
+ config.CPUS_PER_TASK.key -> taskCpus.toString,
+ TASK_GPU_ID.amountConf -> gpuTaskAmount.toString,
+ EXECUTOR_GPU_ID.amountConf -> executorGpus.toString,
+ config.EXECUTOR_CORES.key -> executorCpus.toString)
+
+ val taskSet = if (barrierMode) {
+ FakeTask.createTaskSet(100)
+ } else {
+ FakeTask.createBarrierTaskSet(4 * taskNum)
+ }
+
+ val resources = new ExecutorResourcesAmounts(
+ Map(GPU -> toInternalResource(Map("0" -> 1.0, "1" -> 1.0, "2" ->
1.0, "3" -> 1.0))))
+
+ val workerOffers =
+ IndexedSeq(WorkerOffer("executor0", "host0", executorCpus,
Some("host0"), resources))
+
+ taskScheduler.submitTasks(taskSet)
+ // Launch tasks on executor that satisfies resource requirements.
+ val taskDescriptions =
taskScheduler.resourceOffers(workerOffers).flatten
+ assert(4 * taskNum === taskDescriptions.length)
+ assert(!failedTaskSet)
+ var gpuAddress = -1
+ for (taskId <- 0 until 4 * taskNum) {
+ if (taskId % taskNum == 0) {
+ gpuAddress += 1
+ }
+ assert(ArrayBuffer(gpuAddress.toString) ===
+
taskDescriptions(taskId).resources.get(GPU).get.keys.toArray.sorted)
+ }
+ }
+ }
+ }
+
+ // 4 executors, each of which has 1 GPU
+ Seq(true, false).foreach { barrierMode =>
+ val barrier = if (barrierMode) "barrier" else ""
+ (1 to 20).foreach { taskNum =>
+ val gpuTaskAmount =
ResourceAmountUtils.toFractionalResource(ONE_ENTIRE_RESOURCE / taskNum)
+ test(s"SPARK-45527 default rp with task.gpu.amount=${gpuTaskAmount} can
" +
+ s"restrict $taskNum $barrier tasks run on the different executor") {
+ val taskCpus = 1
+ val executorCpus = 100 // cpu will not limit the concurrent tasks
number
+ val executorGpus = 1
+
+ val taskScheduler = setupScheduler(numCores = executorCpus,
+ config.CPUS_PER_TASK.key -> taskCpus.toString,
+ TASK_GPU_ID.amountConf -> gpuTaskAmount.toString,
+ EXECUTOR_GPU_ID.amountConf -> executorGpus.toString,
+ config.EXECUTOR_CORES.key -> executorCpus.toString)
+
+ val taskSet = if (barrierMode) {
+ FakeTask.createTaskSet(100)
+ } else {
+ FakeTask.createBarrierTaskSet(4 * taskNum)
+ }
+
+ val workerOffers =
+ IndexedSeq(
+ WorkerOffer("executor0", "host0", executorCpus, Some("host0"),
+ new ExecutorResourcesAmounts(Map(GPU ->
toInternalResource(Map("0" -> 1.0))))),
+ WorkerOffer("executor1", "host1", executorCpus, Some("host1"),
+ new ExecutorResourcesAmounts(Map(GPU ->
toInternalResource(Map("1" -> 1.0))))),
+ WorkerOffer("executor2", "host2", executorCpus, Some("host2"),
+ new ExecutorResourcesAmounts(Map(GPU ->
toInternalResource(Map("2" -> 1.0))))),
+ WorkerOffer("executor3", "host3", executorCpus, Some("host3"),
+ new ExecutorResourcesAmounts(Map(GPU ->
toInternalResource(Map("3" -> 1.0))))))
+
+ taskScheduler.submitTasks(taskSet)
+ // Launch tasks on executor that satisfies resource requirements
+
+ val taskDescriptions =
taskScheduler.resourceOffers(workerOffers).flatten
+ assert(4 * taskNum === taskDescriptions.length)
+ assert(!failedTaskSet)
+ val assignedGpus: HashMap[String, Int] = HashMap.empty
+ for (taskId <- 0 until 4 * taskNum) {
+ val gpus =
taskDescriptions(taskId).resources.get(GPU).get.keys.toArray.sorted
+ assert(gpus.length == 1)
+ val addr = gpus(0)
+ if (!assignedGpus.contains(addr)) {
+ assignedGpus(addr) = 1
+ } else {
+ assignedGpus(addr) += 1
+ }
+ }
+ assert(assignedGpus.toMap ===
+ Map("0" -> taskNum, "1" -> taskNum, "2" -> taskNum, "3" -> taskNum))
+ }
+ }
+ }
+
+ // 1 executor with 4 GPUS
+ Seq(true, false).foreach { barrierMode =>
+ val barrier = if (barrierMode) "barrier" else ""
+ (1 to 20).foreach { taskNum =>
+ val gpuTaskAmount =
ResourceAmountUtils.toFractionalResource(ONE_ENTIRE_RESOURCE / taskNum)
+ test(s"SPARK-45527 TaskResourceProfile with
task.gpu.amount=${gpuTaskAmount} can " +
+ s"restrict $taskNum $barrier tasks run in the same executor") {
+ val executorCpus = 100 // cpu will not limit the concurrent tasks
number
+
+ val taskScheduler = setupScheduler(numCores = executorCpus,
+ config.CPUS_PER_TASK.key -> "1",
+ TASK_GPU_ID.amountConf -> "0.1",
+ EXECUTOR_GPU_ID.amountConf -> "4",
+ config.EXECUTOR_CORES.key -> executorCpus.toString)
+
+ val treqs = new TaskResourceRequests().cpus(1).resource(GPU,
gpuTaskAmount)
+ val rp = new TaskResourceProfile(treqs.requests)
+ taskScheduler.sc.resourceProfileManager.addResourceProfile(rp)
+
+ val taskSet = if (barrierMode) {
+ FakeTask.createTaskSet(100, 0, 1, 1, rp.id)
+ } else {
+ FakeTask.createBarrierTaskSet(4 * taskNum, 0, 1, 1, rp.id)
+ }
+ val resources = new ExecutorResourcesAmounts(
+ Map(GPU -> toInternalResource(Map("0" -> 1.0, "1" -> 1.0, "2" ->
1.0, "3" -> 1.0))))
+
+ val workerOffers = IndexedSeq(
+ WorkerOffer("executor0", "host0", executorCpus, Some("host0"),
resources, rp.id)
+ )
+
+ taskScheduler.submitTasks(taskSet)
+ // Launch tasks on executor that satisfies resource requirements.
+ val taskDescriptions =
taskScheduler.resourceOffers(workerOffers).flatten
+ assert(4 * taskNum === taskDescriptions.length)
+ assert(!failedTaskSet)
+ var gpuAddress = -1
+ for (taskId <- 0 until 4 * taskNum) {
+ if (taskId % taskNum == 0) {
+ gpuAddress += 1
+ }
+ assert(ArrayBuffer(gpuAddress.toString) ===
+
taskDescriptions(taskId).resources.get(GPU).get.keys.toArray.sorted)
+ }
+ }
+ }
+ }
+
+ // 4 executors, each of which has 1 GPU
+ Seq(true, false).foreach { barrierMode =>
+ val barrier = if (barrierMode) "barrier" else ""
+ (1 to 20).foreach { taskNum =>
+ val gpuTaskAmount =
ResourceAmountUtils.toFractionalResource(ONE_ENTIRE_RESOURCE / taskNum)
+ test(s"SPARK-45527 TaskResourceProfile with
task.gpu.amount=${gpuTaskAmount} can " +
+ s"restrict $taskNum $barrier tasks run on the different executor") {
+ val executorCpus = 100 // cpu will not limit the concurrent tasks
number
+
+ val taskScheduler = setupScheduler(numCores = executorCpus,
+ config.CPUS_PER_TASK.key -> "1",
+ TASK_GPU_ID.amountConf -> "0.1",
+ EXECUTOR_GPU_ID.amountConf -> "1",
+ config.EXECUTOR_CORES.key -> executorCpus.toString)
+
+ val treqs = new TaskResourceRequests().cpus(1).resource(GPU,
gpuTaskAmount)
+ val rp = new TaskResourceProfile(treqs.requests)
+ taskScheduler.sc.resourceProfileManager.addResourceProfile(rp)
+
+ val taskSet = if (barrierMode) {
+ FakeTask.createTaskSet(100, 0, 1, 1, rp.id)
+ } else {
+ FakeTask.createBarrierTaskSet(4 * taskNum, 0, 1, 1, rp.id)
+ }
+
+ val workerOffers =
+ IndexedSeq(
+ WorkerOffer("executor0", "host0", executorCpus, Some("host1"),
+ new ExecutorResourcesAmounts(Map(GPU ->
toInternalResource(Map("0" -> 1.0)))),
+ rp.id),
+ WorkerOffer("executor1", "host1", executorCpus, Some("host2"),
+ new ExecutorResourcesAmounts(Map(GPU ->
toInternalResource(Map("1" -> 1.0)))),
+ rp.id),
+ WorkerOffer("executor2", "host2", executorCpus, Some("host3"),
+ new ExecutorResourcesAmounts(Map(GPU ->
toInternalResource(Map("2" -> 1.0)))),
+ rp.id),
+ WorkerOffer("executor3", "host3", executorCpus, Some("host4"),
+ new ExecutorResourcesAmounts(Map(GPU ->
toInternalResource(Map("3" -> 1.0)))),
+ rp.id)
+ )
+
+ taskScheduler.submitTasks(taskSet)
+ // Launch tasks on executor that satisfies resource requirements
+
+ val taskDescriptions =
taskScheduler.resourceOffers(workerOffers).flatten
+ assert(4 * taskNum === taskDescriptions.length)
+ assert(!failedTaskSet)
+ val assignedGpus: HashMap[String, Int] = HashMap.empty
+ for (taskId <- 0 until 4 * taskNum) {
+ val gpus =
taskDescriptions(taskId).resources.get(GPU).get.keys.toArray.sorted
+ assert(gpus.length == 1)
+ val addr = gpus(0)
+ if (!assignedGpus.contains(addr)) {
+ assignedGpus(addr) = 1
+ } else {
+ assignedGpus(addr) += 1
+ }
+ }
+ assert(assignedGpus.toMap ===
+ Map("0" -> taskNum, "1" -> taskNum, "2" -> taskNum, "3" -> taskNum))
+ }
+ }
+ }
+
+ test("SPARK-45527 TaskResourceProfile: the left multiple gpu resources on 1
executor " +
+ "can assign to other taskset") {
+ val taskCpus = 1
+ val taskGpus = 0.3
+ val executorGpus = 4
+ val executorCpus = 1000
+
+ // each tasks require 0.3 gpu
+ val taskScheduler = setupScheduler(numCores = executorCpus,
+ config.CPUS_PER_TASK.key -> taskCpus.toString,
+ TASK_GPU_ID.amountConf -> taskGpus.toString,
+ EXECUTOR_GPU_ID.amountConf -> executorGpus.toString,
+ config.EXECUTOR_CORES.key -> executorCpus.toString
+ )
+ val lowerTaskSet = FakeTask.createTaskSet(100, 1, 0, 1,
+ ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
+
+ // each task require 0.7 gpu
+ val treqs = new TaskResourceRequests().cpus(1).resource(GPU, 0.7)
+ val rp = new TaskResourceProfile(treqs.requests)
+ taskScheduler.sc.resourceProfileManager.addResourceProfile(rp)
+
+ val higherRpTaskSet = FakeTask.createTaskSet(1000, stageId = 2,
stageAttemptId = 0,
+ priority = 0, rpId = rp.id)
+
+ val workerOffers =
+ IndexedSeq(
+ // cpu won't be a problem
+ WorkerOffer("executor0", "host0", 1000, None, new
ExecutorResourcesAmounts(
+ Map(GPU -> toInternalResource(Map("0" -> 1.0, "1" -> 1.0, "2" ->
1.0, "3" -> 1.0)))))
+ )
+
+ taskScheduler.submitTasks(lowerTaskSet)
+ taskScheduler.submitTasks(higherRpTaskSet)
+
+ // should have 3 for default profile and 2 for additional resource profile
+ val taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten
+ assert(8 === taskDescriptions.length)
+ var index = 0
+ for (tDesc <- taskDescriptions) {
+ assert(tDesc.resources.contains(GPU))
+ val addresses = tDesc.resources.get(GPU).get.keys.toArray.sorted
+ assert(addresses.length == 1)
+ if (index < 4) { // the first 4 tasks will grab 0.7 gpu
+ assert(addresses(0) == index.toString)
+ assert(ResourceAmountUtils.toFractionalResource(
+ tDesc.resources.get(GPU).get(index.toString)) == 0.7)
+ } else {
+ assert(addresses(0) == (index - 4).toString)
+ assert(ResourceAmountUtils.toFractionalResource(
+ tDesc.resources.get(GPU).get((index - 4).toString)) == 0.3)
+ }
+ index += 1
+ }
+ }
+
+ test("SPARK-45527 TaskResourceProfile: the left gpu resources on multiple
executors " +
+ "can assign to other taskset") {
+ val taskCpus = 1
+ val taskGpus = 0.3
+ val executorGpus = 4
+ val executorCpus = 1000
Review Comment:
Thx for your improvement.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]