dongjoon-hyun commented on code in PR #43494:
URL: https://github.com/apache/spark/pull/43494#discussion_r1503368176


##########
core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala:
##########
@@ -2283,4 +2295,425 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext
     taskScheduler.handleFailedTask(tsm, tid, state, reason)
   }
 
+  private implicit def toInternalResource(resources: Map[String, Double]): 
Map[String, Long] =
+    resources.map { case (k, v) => k -> 
ResourceAmountUtils.toInternalResource(v) }
+
+  // 1 executor with 4 GPUS
+  Seq(true, false).foreach { barrierMode =>
+    val barrier = if (barrierMode) "barrier" else ""
+    (1 to 20).foreach { taskNum =>
+      val gpuTaskAmount = 
ResourceAmountUtils.toFractionalResource(ONE_ENTIRE_RESOURCE / taskNum)
+      test(s"SPARK-45527 default rp with task.gpu.amount=${gpuTaskAmount} can 
" +
+        s"restrict $taskNum $barrier tasks run in the same executor") {
+        val taskCpus = 1
+        val executorCpus = 100 // cpu will not limit the concurrent tasks 
number
+        val executorGpus = 1
+
+        val taskScheduler = setupScheduler(numCores = executorCpus,
+          config.CPUS_PER_TASK.key -> taskCpus.toString,
+          TASK_GPU_ID.amountConf -> gpuTaskAmount.toString,
+          EXECUTOR_GPU_ID.amountConf -> executorGpus.toString,
+          config.EXECUTOR_CORES.key -> executorCpus.toString)
+
+        val taskSet = if (barrierMode) {
+          FakeTask.createTaskSet(100)
+        } else {
+          FakeTask.createBarrierTaskSet(4 * taskNum)
+        }
+
+        val resources = new ExecutorResourcesAmounts(
+          Map(GPU -> toInternalResource(Map("0" -> 1.0, "1" -> 1.0, "2" -> 
1.0, "3" -> 1.0))))
+
+        val workerOffers =
+          IndexedSeq(WorkerOffer("executor0", "host0", executorCpus, 
Some("host0"), resources))
+
+        taskScheduler.submitTasks(taskSet)
+        // Launch tasks on executor that satisfies resource requirements.
+        val taskDescriptions = 
taskScheduler.resourceOffers(workerOffers).flatten
+        assert(4 * taskNum === taskDescriptions.length)
+        assert(!failedTaskSet)
+        var gpuAddress = -1
+        for (taskId <- 0 until 4 * taskNum) {
+          if (taskId % taskNum == 0) {
+            gpuAddress += 1
+          }
+          assert(ArrayBuffer(gpuAddress.toString) ===
+            
taskDescriptions(taskId).resources.get(GPU).get.keys.toArray.sorted)
+        }
+      }
+    }
+  }
+
+  // 4 executors, each of which has 1 GPU
+  Seq(true, false).foreach { barrierMode =>
+    val barrier = if (barrierMode) "barrier" else ""
+    (1 to 20).foreach { taskNum =>
+      val gpuTaskAmount = 
ResourceAmountUtils.toFractionalResource(ONE_ENTIRE_RESOURCE / taskNum)
+      test(s"SPARK-45527 default rp with task.gpu.amount=${gpuTaskAmount} can 
" +
+        s"restrict $taskNum $barrier tasks run on the different executor") {
+        val taskCpus = 1
+        val executorCpus = 100 // cpu will not limit the concurrent tasks 
number
+        val executorGpus = 1
+
+        val taskScheduler = setupScheduler(numCores = executorCpus,
+          config.CPUS_PER_TASK.key -> taskCpus.toString,
+          TASK_GPU_ID.amountConf -> gpuTaskAmount.toString,
+          EXECUTOR_GPU_ID.amountConf -> executorGpus.toString,
+          config.EXECUTOR_CORES.key -> executorCpus.toString)
+
+        val taskSet = if (barrierMode) {
+          FakeTask.createTaskSet(100)
+        } else {
+          FakeTask.createBarrierTaskSet(4 * taskNum)
+        }
+
+        val workerOffers =
+          IndexedSeq(
+            WorkerOffer("executor0", "host0", executorCpus, Some("host0"),
+              new ExecutorResourcesAmounts(Map(GPU -> 
toInternalResource(Map("0" -> 1.0))))),
+            WorkerOffer("executor1", "host1", executorCpus, Some("host1"),
+              new ExecutorResourcesAmounts(Map(GPU -> 
toInternalResource(Map("1" -> 1.0))))),
+            WorkerOffer("executor2", "host2", executorCpus, Some("host2"),
+              new ExecutorResourcesAmounts(Map(GPU -> 
toInternalResource(Map("2" -> 1.0))))),
+            WorkerOffer("executor3", "host3", executorCpus, Some("host3"),
+              new ExecutorResourcesAmounts(Map(GPU -> 
toInternalResource(Map("3" -> 1.0))))))
+
+        taskScheduler.submitTasks(taskSet)
+        // Launch tasks on executor that satisfies resource requirements
+
+        val taskDescriptions = 
taskScheduler.resourceOffers(workerOffers).flatten
+        assert(4 * taskNum === taskDescriptions.length)
+        assert(!failedTaskSet)
+        val assignedGpus: HashMap[String, Int] = HashMap.empty
+        for (taskId <- 0 until 4 * taskNum) {
+          val gpus = 
taskDescriptions(taskId).resources.get(GPU).get.keys.toArray.sorted
+          assert(gpus.length == 1)
+          val addr = gpus(0)
+          if (!assignedGpus.contains(addr)) {
+            assignedGpus(addr) = 1
+          } else {
+            assignedGpus(addr) += 1
+          }
+        }
+        assert(assignedGpus.toMap ===
+          Map("0" -> taskNum, "1" -> taskNum, "2" -> taskNum, "3" -> taskNum))
+      }
+    }
+  }
+
+  // 1 executor with 4 GPUS
+  Seq(true, false).foreach { barrierMode =>
+    val barrier = if (barrierMode) "barrier" else ""
+    (1 to 20).foreach { taskNum =>
+      val gpuTaskAmount = 
ResourceAmountUtils.toFractionalResource(ONE_ENTIRE_RESOURCE / taskNum)
+      test(s"SPARK-45527 TaskResourceProfile with 
task.gpu.amount=${gpuTaskAmount} can " +
+        s"restrict $taskNum $barrier tasks run in the same executor") {
+        val executorCpus = 100 // cpu will not limit the concurrent tasks 
number
+
+        val taskScheduler = setupScheduler(numCores = executorCpus,
+          config.CPUS_PER_TASK.key -> "1",
+          TASK_GPU_ID.amountConf -> "0.1",
+          EXECUTOR_GPU_ID.amountConf -> "4",
+          config.EXECUTOR_CORES.key -> executorCpus.toString)
+
+        val treqs = new TaskResourceRequests().cpus(1).resource(GPU, 
gpuTaskAmount)
+        val rp = new TaskResourceProfile(treqs.requests)
+        taskScheduler.sc.resourceProfileManager.addResourceProfile(rp)
+
+        val taskSet = if (barrierMode) {
+          FakeTask.createTaskSet(100, 0, 1, 1, rp.id)
+        } else {
+          FakeTask.createBarrierTaskSet(4 * taskNum, 0, 1, 1, rp.id)
+        }
+        val resources = new ExecutorResourcesAmounts(
+          Map(GPU -> toInternalResource(Map("0" -> 1.0, "1" -> 1.0, "2" -> 
1.0, "3" -> 1.0))))
+
+        val workerOffers = IndexedSeq(
+          WorkerOffer("executor0", "host0", executorCpus, Some("host0"), 
resources, rp.id)
+        )
+
+        taskScheduler.submitTasks(taskSet)
+        // Launch tasks on executor that satisfies resource requirements.
+        val taskDescriptions = 
taskScheduler.resourceOffers(workerOffers).flatten
+        assert(4 * taskNum === taskDescriptions.length)
+        assert(!failedTaskSet)
+        var gpuAddress = -1
+        for (taskId <- 0 until 4 * taskNum) {
+          if (taskId % taskNum == 0) {
+            gpuAddress += 1
+          }
+          assert(ArrayBuffer(gpuAddress.toString) ===
+            
taskDescriptions(taskId).resources.get(GPU).get.keys.toArray.sorted)
+        }
+      }
+    }
+  }
+
+  // 4 executors, each of which has 1 GPU
+  Seq(true, false).foreach { barrierMode =>
+    val barrier = if (barrierMode) "barrier" else ""
+    (1 to 20).foreach { taskNum =>
+      val gpuTaskAmount = 
ResourceAmountUtils.toFractionalResource(ONE_ENTIRE_RESOURCE / taskNum)
+      test(s"SPARK-45527 TaskResourceProfile with 
task.gpu.amount=${gpuTaskAmount} can " +
+        s"restrict $taskNum $barrier tasks run on the different executor") {
+        val executorCpus = 100 // cpu will not limit the concurrent tasks 
number
+
+        val taskScheduler = setupScheduler(numCores = executorCpus,
+          config.CPUS_PER_TASK.key -> "1",
+          TASK_GPU_ID.amountConf -> "0.1",
+          EXECUTOR_GPU_ID.amountConf -> "1",
+          config.EXECUTOR_CORES.key -> executorCpus.toString)
+
+        val treqs = new TaskResourceRequests().cpus(1).resource(GPU, 
gpuTaskAmount)
+        val rp = new TaskResourceProfile(treqs.requests)
+        taskScheduler.sc.resourceProfileManager.addResourceProfile(rp)
+
+        val taskSet = if (barrierMode) {
+          FakeTask.createTaskSet(100, 0, 1, 1, rp.id)
+        } else {
+          FakeTask.createBarrierTaskSet(4 * taskNum, 0, 1, 1, rp.id)
+        }
+
+        val workerOffers =
+          IndexedSeq(
+            WorkerOffer("executor0", "host0", executorCpus, Some("host1"),
+              new ExecutorResourcesAmounts(Map(GPU -> 
toInternalResource(Map("0" -> 1.0)))),
+              rp.id),
+            WorkerOffer("executor1", "host1", executorCpus, Some("host2"),
+              new ExecutorResourcesAmounts(Map(GPU -> 
toInternalResource(Map("1" -> 1.0)))),
+              rp.id),
+            WorkerOffer("executor2", "host2", executorCpus, Some("host3"),
+              new ExecutorResourcesAmounts(Map(GPU -> 
toInternalResource(Map("2" -> 1.0)))),
+              rp.id),
+            WorkerOffer("executor3", "host3", executorCpus, Some("host4"),
+              new ExecutorResourcesAmounts(Map(GPU -> 
toInternalResource(Map("3" -> 1.0)))),
+              rp.id)
+          )
+
+        taskScheduler.submitTasks(taskSet)
+        // Launch tasks on executor that satisfies resource requirements
+
+        val taskDescriptions = 
taskScheduler.resourceOffers(workerOffers).flatten
+        assert(4 * taskNum === taskDescriptions.length)
+        assert(!failedTaskSet)
+        val assignedGpus: HashMap[String, Int] = HashMap.empty
+        for (taskId <- 0 until 4 * taskNum) {
+          val gpus = 
taskDescriptions(taskId).resources.get(GPU).get.keys.toArray.sorted
+          assert(gpus.length == 1)
+          val addr = gpus(0)
+          if (!assignedGpus.contains(addr)) {
+            assignedGpus(addr) = 1
+          } else {
+            assignedGpus(addr) += 1
+          }
+        }
+        assert(assignedGpus.toMap ===
+          Map("0" -> taskNum, "1" -> taskNum, "2" -> taskNum, "3" -> taskNum))
+      }
+    }
+  }
+
+  test("SPARK-45527 TaskResourceProfile: the left multiple gpu resources on 1 
executor " +
+    "can assign to other taskset") {
+    val taskCpus = 1
+    val taskGpus = 0.3
+    val executorGpus = 4
+    val executorCpus = 1000
+
+    // each tasks require 0.3 gpu
+    val taskScheduler = setupScheduler(numCores = executorCpus,
+      config.CPUS_PER_TASK.key -> taskCpus.toString,
+      TASK_GPU_ID.amountConf -> taskGpus.toString,
+      EXECUTOR_GPU_ID.amountConf -> executorGpus.toString,
+      config.EXECUTOR_CORES.key -> executorCpus.toString
+    )
+    val lowerTaskSet = FakeTask.createTaskSet(100, 1, 0, 1,
+      ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
+
+    // each task require 0.7 gpu
+    val treqs = new TaskResourceRequests().cpus(1).resource(GPU, 0.7)
+    val rp = new TaskResourceProfile(treqs.requests)
+    taskScheduler.sc.resourceProfileManager.addResourceProfile(rp)
+
+    val higherRpTaskSet = FakeTask.createTaskSet(1000, stageId = 2, 
stageAttemptId = 0,
+      priority = 0, rpId = rp.id)
+
+    val workerOffers =
+      IndexedSeq(
+        // cpu won't be a problem
+        WorkerOffer("executor0", "host0", 1000, None, new 
ExecutorResourcesAmounts(
+          Map(GPU -> toInternalResource(Map("0" -> 1.0, "1" -> 1.0, "2" -> 
1.0, "3" -> 1.0)))))
+      )
+
+    taskScheduler.submitTasks(lowerTaskSet)
+    taskScheduler.submitTasks(higherRpTaskSet)
+
+    // should have 3 for default profile and 2 for additional resource profile
+    val taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten
+    assert(8 === taskDescriptions.length)
+    var index = 0
+    for (tDesc <- taskDescriptions) {
+      assert(tDesc.resources.contains(GPU))
+      val addresses = tDesc.resources.get(GPU).get.keys.toArray.sorted
+      assert(addresses.length == 1)
+      if (index < 4) { // the first 4 tasks will grab 0.7 gpu
+        assert(addresses(0) == index.toString)
+        assert(ResourceAmountUtils.toFractionalResource(
+          tDesc.resources.get(GPU).get(index.toString)) == 0.7)
+      } else {
+        assert(addresses(0) == (index - 4).toString)
+        assert(ResourceAmountUtils.toFractionalResource(
+          tDesc.resources.get(GPU).get((index - 4).toString)) == 0.3)
+      }
+      index += 1
+    }
+  }
+
+  test("SPARK-45527 TaskResourceProfile: the left gpu resources on multiple 
executors " +
+    "can assign to other taskset") {
+    val taskCpus = 1
+    val taskGpus = 0.3
+    val executorGpus = 4
+    val executorCpus = 1000

Review Comment:
   1000 threads seem to be too large for some CI systems with a limited 
resource.
   - 
https://github.com/apache/spark/actions/workflows/build_maven_java21_macos14.yml
     - https://github.com/apache/spark/actions/runs/8054862135/job/22000403549
   ```
   Warning: [766.327s][warning][os,thread] Failed to start thread "Unknown 
thread" - pthread_create failed (EAGAIN) for attributes: stacksize: 4096k, 
guardsize: 16k, detached.
   Warning: [766.327s][warning][os,thread] Failed to start the native thread 
for java.lang.Thread "dispatcher-event-loop-840"
   *** RUN ABORTED ***
   An exception or error caused a run to abort: unable to create native thread: 
possibly out of memory or process/resource limits reached 
     java.lang.OutOfMemoryError: unable to create native thread: possibly out 
of memory or process/resource limits reached
   ```
   
   I made a test-case follow-up.
   - https://github.com/apache/spark/pull/45264



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to