This is an automated email from the ASF dual-hosted git repository.

wuyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 96b301ce4e41 [SPARK-47498][TESTS][CORE] Refine some GPU fraction 
calculation tests
96b301ce4e41 is described below

commit 96b301ce4e414b575352c431c31412310c7f168b
Author: Bobby Wang <wbo4...@gmail.com>
AuthorDate: Wed Mar 27 11:47:51 2024 +0800

    [SPARK-47498][TESTS][CORE] Refine some GPU fraction calculation tests
    
    ### What changes were proposed in this pull request?
    
    This PR refines some fractional GPU resource calculation tests.
    
    ### Why are the changes needed?
    
    This PR adds more comments to the tests and refines some assertation blocks.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    The existing CI passes.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #45631 from wbo4958/refine-tests.
    
    Authored-by: Bobby Wang <wbo4...@gmail.com>
    Signed-off-by: Yi Wu <yi...@databricks.com>
---
 .../scheduler/ExecutorResourcesAmountsSuite.scala  |  24 +--
 .../spark/scheduler/TaskSchedulerImplSuite.scala   | 210 +++++++++++----------
 2 files changed, 121 insertions(+), 113 deletions(-)

diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/ExecutorResourcesAmountsSuite.scala
 
b/core/src/test/scala/org/apache/spark/scheduler/ExecutorResourcesAmountsSuite.scala
index 75a772dcdec8..1c5cb041ad6c 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/ExecutorResourcesAmountsSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/ExecutorResourcesAmountsSuite.scala
@@ -53,7 +53,7 @@ class ExecutorResourcesAmountsSuite extends SparkFunSuite 
with ExecutorResourceU
     // assign nothing to rp without resource profile
     val assigned = availableExecResAmounts.assignAddressesCustomResources(rp)
     assert(assigned.isDefined)
-    assigned.foreach { case resource => assert(resource.isEmpty) }
+    assigned.foreach(resource => assert(resource.isEmpty))
   }
 
   test("Convert ExecutorResourceInfos to ExecutorResourcesAmounts") {
@@ -187,8 +187,8 @@ class ExecutorResourcesAmountsSuite extends SparkFunSuite 
with ExecutorResourceU
     val rp = new ResourceProfileBuilder().require(treqs).build()
 
     var assigned = availableExecResAmounts.assignAddressesCustomResources(rp)
-    assert(!assigned.isEmpty)
-    assigned.foreach { case resource => assert(!resource.isEmpty)}
+    assert(assigned.isDefined)
+    assigned.foreach(resource => assert(resource.nonEmpty))
 
     val treqs1 = new TaskResourceRequests()
       .resource("gpu", gpuTaskAmount)
@@ -270,8 +270,8 @@ class ExecutorResourcesAmountsSuite extends SparkFunSuite 
with ExecutorResourceU
 
     // taskMount = 0.1 < 1.0 which can be assigned.
     val assigned = availableExecResAmounts.assignAddressesCustomResources(rp)
-    assert(!assigned.isEmpty)
-    assigned.foreach { case resource =>
+    assert(assigned.isDefined)
+    assigned.foreach { resource =>
       assert(resource.size === 1)
       assert(resource.keys.toSeq === Seq("gpu"))
       assert(resource("gpu").size === 1)
@@ -337,8 +337,8 @@ class ExecutorResourcesAmountsSuite extends SparkFunSuite 
with ExecutorResourceU
 
     // taskMount = 0.1 < 1.0 which can be assigned.
     val assigned = availableExecResAmounts.assignAddressesCustomResources(rp)
-    assert(!assigned.isEmpty)
-    assigned.foreach { case resourceAmounts =>
+    assert(assigned.isDefined)
+    assigned.foreach { resourceAmounts =>
       assert(resourceAmounts.size === 2)
       assert(resourceAmounts.keys.toSeq.sorted === Seq("gpu", "fpga").sorted)
 
@@ -402,17 +402,17 @@ class ExecutorResourcesAmountsSuite extends SparkFunSuite 
with ExecutorResourceU
       val treqs = new TaskResourceRequests().resource("gpu", taskAmount)
       val rp = new ResourceProfileBuilder().require(treqs).build()
       val assigned = availableExecResAmounts.assignAddressesCustomResources(rp)
-      assert(!assigned.isEmpty)
-      assigned.foreach { case resources =>
+      assert(assigned.isDefined)
+      assigned.foreach { resources =>
         assert(
-          
resources("gpu").values.toArray.sorted.map(ResourceAmountUtils.toFractionalResource(_))
+          
resources("gpu").values.toArray.sorted.map(ResourceAmountUtils.toFractionalResource)
           === expectedAssignedAmount.sorted)
 
         availableExecResAmounts.acquire(resources)
 
         val leftRes = availableExecResAmounts.availableResources
         assert(leftRes.size == 1)
-        assert(leftRes.keys.toSeq(0) == "gpu")
+        assert(leftRes.keys.toSeq.head == "gpu")
         assert(compareMaps(leftRes("gpu"), expectedLeftRes))
       }
     }
@@ -424,7 +424,7 @@ class ExecutorResourcesAmountsSuite extends SparkFunSuite 
with ExecutorResourceU
 
       val leftRes = availableExecResAmounts.availableResources
       assert(leftRes.size == 1)
-      assert(leftRes.keys.toSeq(0) == "gpu")
+      assert(leftRes.keys.toSeq.head == "gpu")
       assert(compareMaps(leftRes("gpu"), expectedLeftRes))
     }
 
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
index d9db5f656176..5cc97410bcce 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
@@ -2302,13 +2302,10 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext
         taskDescriptions = taskDescriptions.sortBy(t => t.index)
         assert(4 * taskNum === taskDescriptions.length)
         assert(!failedTaskSet)
-        var gpuAddress = -1
-        for (taskId <- 0 until 4 * taskNum) {
-          if (taskId % taskNum == 0) {
-            gpuAddress += 1
-          }
-          assert(ArrayBuffer(gpuAddress.toString) ===
-            
taskDescriptions(taskId).resources.get(GPU).get.keys.toArray.sorted)
+        for (task <- taskDescriptions) {
+          // Spark offers the GPU resources in an ascending order starting 
from the lowest
+          // GPU addresses and turns around to the next address until the 
current one is exhausted.
+          assert(task.resources(GPU).keys.toArray === Array((task.index / 
taskNum).toString))
         }
       }
     }
@@ -2331,12 +2328,6 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext
           EXECUTOR_GPU_ID.amountConf -> executorGpus.toString,
           config.EXECUTOR_CORES.key -> executorCpus.toString)
 
-        val taskSet = if (barrierMode) {
-          FakeTask.createBarrierTaskSet(4 * taskNum)
-        } else {
-          FakeTask.createTaskSet(100)
-        }
-
         val workerOffers =
           IndexedSeq(
             WorkerOffer("executor0", "host0", executorCpus, Some("host0"),
@@ -2348,22 +2339,24 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext
             WorkerOffer("executor3", "host3", executorCpus, Some("host3"),
               new ExecutorResourcesAmounts(Map(GPU -> 
toInternalResource(Map("3" -> 1.0))))))
 
+        val executorNumber = workerOffers.length
+
+        val taskSet = if (barrierMode) {
+          FakeTask.createBarrierTaskSet(executorNumber * taskNum)
+        } else {
+          FakeTask.createTaskSet(100)
+        }
+
         taskScheduler.submitTasks(taskSet)
         // Launch tasks on executor that satisfies resource requirements
-
         val taskDescriptions = 
taskScheduler.resourceOffers(workerOffers).flatten
-        assert(4 * taskNum === taskDescriptions.length)
+        assert(executorNumber * taskNum === taskDescriptions.length)
         assert(!failedTaskSet)
-        val assignedGpus: HashMap[String, Int] = HashMap.empty
-        for (taskId <- 0 until 4 * taskNum) {
-          val gpus = 
taskDescriptions(taskId).resources.get(GPU).get.keys.toArray.sorted
-          assert(gpus.length == 1)
-          val addr = gpus(0)
-          if (!assignedGpus.contains(addr)) {
-            assignedGpus(addr) = 1
-          } else {
-            assignedGpus(addr) += 1
-          }
+        val assignedGpus: mutable.HashMap[String, Int] = mutable.HashMap.empty
+        taskDescriptions.foreach { task =>
+          val addresses = task.resources(GPU).keys.toArray
+          assert(addresses.length == 1)
+          assignedGpus.update(addresses(0), 
assignedGpus.getOrElseUpdate(addresses(0), 0) + 1)
         }
         assert(assignedGpus.toMap ===
           Map("0" -> taskNum, "1" -> taskNum, "2" -> taskNum, "3" -> taskNum))
@@ -2408,13 +2401,10 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext
         taskDescriptions = taskDescriptions.sortBy(t => t.index)
         assert(4 * taskNum === taskDescriptions.length)
         assert(!failedTaskSet)
-        var gpuAddress = -1
-        for (taskId <- 0 until 4 * taskNum) {
-          if (taskId % taskNum == 0) {
-            gpuAddress += 1
-          }
-          assert(ArrayBuffer(gpuAddress.toString) ===
-            
taskDescriptions(taskId).resources.get(GPU).get.keys.toArray.sorted)
+        for (task <- taskDescriptions) {
+          // Spark offers the GPU resources in an ascending order starting 
from the lowest
+          // GPU addresses and turns around to the next address until the 
current one is exhausted.
+          assert(task.resources(GPU).keys.toArray === Array((task.index / 
taskNum).toString))
         }
       }
     }
@@ -2439,12 +2429,6 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext
         val rp = new TaskResourceProfile(treqs.requests)
         taskScheduler.sc.resourceProfileManager.addResourceProfile(rp)
 
-        val taskSet = if (barrierMode) {
-          FakeTask.createBarrierTaskSet(4 * taskNum, 0, 1, 1, rp.id)
-        } else {
-          FakeTask.createTaskSet(100, 0, 1, 1, rp.id)
-        }
-
         val workerOffers =
           IndexedSeq(
             WorkerOffer("executor0", "host0", executorCpus, Some("host1"),
@@ -2461,22 +2445,26 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext
               rp.id)
           )
 
+        val executorNumber = workerOffers.length
+
+        val taskSet = if (barrierMode) {
+          FakeTask.createBarrierTaskSet(executorNumber * taskNum, 0, 1, 1, 
rp.id)
+        } else {
+          FakeTask.createTaskSet(100, 0, 1, 1, rp.id)
+        }
+
         taskScheduler.submitTasks(taskSet)
         // Launch tasks on executor that satisfies resource requirements
 
         val taskDescriptions = 
taskScheduler.resourceOffers(workerOffers).flatten
-        assert(4 * taskNum === taskDescriptions.length)
+        assert(executorNumber * taskNum === taskDescriptions.length)
         assert(!failedTaskSet)
-        val assignedGpus: HashMap[String, Int] = HashMap.empty
-        for (taskId <- 0 until 4 * taskNum) {
-          val gpus = 
taskDescriptions(taskId).resources.get(GPU).get.keys.toArray.sorted
-          assert(gpus.length == 1)
-          val addr = gpus(0)
-          if (!assignedGpus.contains(addr)) {
-            assignedGpus(addr) = 1
-          } else {
-            assignedGpus(addr) += 1
-          }
+
+        val assignedGpus: mutable.HashMap[String, Int] = mutable.HashMap.empty
+        taskDescriptions.foreach { task =>
+          val addresses = task.resources(GPU).keys.toArray
+          assert(addresses.length == 1)
+          assignedGpus.update(addresses(0), 
assignedGpus.getOrElseUpdate(addresses(0), 0) + 1)
         }
         assert(assignedGpus.toMap ===
           Map("0" -> taskNum, "1" -> taskNum, "2" -> taskNum, "3" -> taskNum))
@@ -2487,22 +2475,23 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext
   test("SPARK-45527 TaskResourceProfile: the left multiple gpu resources on 1 
executor " +
     "can assign to other taskset") {
     val taskCpus = 1
-    val taskGpus = 0.3
+    val lowerGpuTaskAmount = 0.3
     val executorGpus = 4
-    val executorCpus = 50
+    val executorCpus = 20
 
     // each tasks require 0.3 gpu
     val taskScheduler = setupScheduler(numCores = executorCpus,
       config.CPUS_PER_TASK.key -> taskCpus.toString,
-      TASK_GPU_ID.amountConf -> taskGpus.toString,
+      TASK_GPU_ID.amountConf -> lowerGpuTaskAmount.toString,
       EXECUTOR_GPU_ID.amountConf -> executorGpus.toString,
       config.EXECUTOR_CORES.key -> executorCpus.toString
     )
-    val lowerTaskSet = FakeTask.createTaskSet(100, 1, 0, 1,
-      ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
+    val lowerTaskSet = FakeTask.createTaskSet(100, stageId = 1, stageAttemptId 
= 0,
+      priority = 1, ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
 
     // each task require 0.7 gpu
-    val treqs = new TaskResourceRequests().cpus(1).resource(GPU, 0.7)
+    val higherGpuTaskAmount = 0.7
+    val treqs = new TaskResourceRequests().cpus(1).resource(GPU, 
higherGpuTaskAmount)
     val rp = new TaskResourceProfile(treqs.requests)
     taskScheduler.sc.resourceProfileManager.addResourceProfile(rp)
 
@@ -2519,22 +2508,26 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext
     taskScheduler.submitTasks(lowerTaskSet)
     taskScheduler.submitTasks(higherRpTaskSet)
 
-    // should have 3 for default profile and 2 for additional resource profile
+    // Initially, offer the available resources to the higher priority task 
set, where each task
+    //      requires 0.7 GPU, so Spark can assign the GPU resources to a 
maximum of 4 tasks, leaving
+    //      0.3 GPU available for each remaining tasks.
+    // Secondly, try to offer the available resources to the lower priority 
task set, where each
+    //      task requires 0.3 GPU, so the left available GPU resources can be 
offered to a maximum
+    //      of 4 lower priority tasks.
     val taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten
     assert(8 === taskDescriptions.length)
     var index = 0
     for (tDesc <- taskDescriptions) {
       assert(tDesc.resources.contains(GPU))
-      val addresses = tDesc.resources.get(GPU).get.keys.toArray.sorted
+      val addresses = tDesc.resources(GPU).keys.toArray.sorted
       assert(addresses.length == 1)
+      assert(addresses(0) == tDesc.index.toString)
       if (index < 4) { // the first 4 tasks will grab 0.7 gpu
-        assert(addresses(0) == index.toString)
         assert(ResourceAmountUtils.toFractionalResource(
-          tDesc.resources.get(GPU).get(index.toString)) == 0.7)
+          tDesc.resources(GPU)(tDesc.index.toString)) == higherGpuTaskAmount)
       } else {
-        assert(addresses(0) == (index - 4).toString)
         assert(ResourceAmountUtils.toFractionalResource(
-          tDesc.resources.get(GPU).get((index - 4).toString)) == 0.3)
+          tDesc.resources(GPU)(tDesc.index.toString)) == lowerGpuTaskAmount)
       }
       index += 1
     }
@@ -2543,14 +2536,14 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext
   test("SPARK-45527 TaskResourceProfile: the left gpu resources on multiple 
executors " +
     "can assign to other taskset") {
     val taskCpus = 1
-    val taskGpus = 0.3
+    val lowerTaskGpuAmount = 0.3
     val executorGpus = 4
-    val executorCpus = 50
+    val executorCpus = 20
 
     // each tasks require 0.3 gpu
     val taskScheduler = setupScheduler(numCores = executorCpus,
       config.CPUS_PER_TASK.key -> taskCpus.toString,
-      TASK_GPU_ID.amountConf -> taskGpus.toString,
+      TASK_GPU_ID.amountConf -> lowerTaskGpuAmount.toString,
       EXECUTOR_GPU_ID.amountConf -> executorGpus.toString,
       config.EXECUTOR_CORES.key -> executorCpus.toString
     )
@@ -2558,7 +2551,8 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext
       ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
 
     // each task require 0.7 gpu
-    val treqs = new TaskResourceRequests().cpus(1).resource(GPU, 0.7)
+    val higherTaskGpuAmount = 0.7
+    val treqs = new TaskResourceRequests().cpus(1).resource(GPU, 
higherTaskGpuAmount)
     val rp = new TaskResourceProfile(treqs.requests)
     taskScheduler.sc.resourceProfileManager.addResourceProfile(rp)
 
@@ -2568,21 +2562,36 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext
     val workerOffers =
       IndexedSeq(
         // cpu won't be a problem
-        WorkerOffer("executor0", "host0", 50, None, new 
ExecutorResourcesAmounts(
+        WorkerOffer("executor0", "host0", executorCpus, None, new 
ExecutorResourcesAmounts(
           Map(GPU -> toInternalResource(Map("0" -> 1.0))))),
-        WorkerOffer("executor1", "host1", 50, None, new 
ExecutorResourcesAmounts(
+        WorkerOffer("executor1", "host1", executorCpus, None, new 
ExecutorResourcesAmounts(
           Map(GPU -> toInternalResource(Map("1" -> 1.0))))),
-        WorkerOffer("executor2", "host2", 50, None, new 
ExecutorResourcesAmounts(
+        WorkerOffer("executor2", "host2", executorCpus, None, new 
ExecutorResourcesAmounts(
           Map(GPU -> toInternalResource(Map("2" -> 1.0))))),
-        WorkerOffer("executor3", "host3", 50, None, new 
ExecutorResourcesAmounts(
+        WorkerOffer("executor3", "host3", executorCpus, None, new 
ExecutorResourcesAmounts(
           Map(GPU -> toInternalResource(Map("3" -> 1.0)))))
       )
 
     taskScheduler.submitTasks(lowerTaskSet)
     taskScheduler.submitTasks(higherRpTaskSet)
 
-    // should have 3 for default profile and 2 for additional resource profile
-    val taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten
+    // Initially, offer the available resources to the higher priority task 
set, where each task
+    //      requires 0.7 GPU, so Spark can assign the GPU resources to a 
maximum of 4 tasks, leaving
+    //      0.3 GPU available for each remaining tasks.
+    // Secondly, try to offer the available resources to the lower priority 
task set, where each
+    //      task requires 0.3 GPU, so the left available GPU resources can be 
offered to a maximum
+    //      of 4 lower priority tasks.
+    var taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten
+    // The original taskDescriptions is like below layout
+    //    Executor 0,       executor 1,          executor 2,      executor 3
+    // index 0,  index 0,  index 1, index 1   index 2, index 2,  index 3, 
index 3
+    // task_0.7, task_03  task_0.7, task_03  task_0.7, task_03  task_0.7, 
task_03
+    // After sorting, it should be
+    //   Executor 0,       executor 1,          executor 2,      executor 3
+    //  index 0             index 1               index 2         index 3
+    // task_0.7,            task_0.7             task_0.7,       task_0.7
+    // task_03,             task_03,              task_03,       task_03
+    taskDescriptions = taskDescriptions.sortBy(_.taskId)
     assert(8 === taskDescriptions.length)
 
     var index = 0
@@ -2591,32 +2600,27 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext
 
     for (tDesc <- taskDescriptions) {
       assert(tDesc.resources.contains(GPU))
-      val addresses = tDesc.resources.get(GPU).get.keys.toArray.sorted
+      val addresses = tDesc.resources(GPU).keys.toArray.sorted
       assert(addresses.length == 1)
       val address = addresses(0)
 
-      //    Executor 0,       executor 1,           executor 2,      executor 3
-      // task_0.7, task_03  task_0.7, task_03  task_0.7, task_03  task_0.7, 
task_03
-      if (index % 2 == 0) {
-        higherAssignedExecutorsGpus.append(
-          (tDesc.executorId, address))
-        assert(ResourceAmountUtils.toFractionalResource(
-          tDesc.resources.get(GPU).get(address)) == 0.7)
+
+      if (index / 4 == 0) {
+        higherAssignedExecutorsGpus.append((tDesc.executorId, address))
+        
assert(ResourceAmountUtils.toFractionalResource(tDesc.resources(GPU)(address)) 
==
+          higherTaskGpuAmount)
       } else {
-        lowerAssignedExecutorsGpus.append(
-          (tDesc.executorId, address))
-        assert(ResourceAmountUtils.toFractionalResource(
-          tDesc.resources.get(GPU).get(address)) == 0.3)
+        lowerAssignedExecutorsGpus.append((tDesc.executorId, address))
+        
assert(ResourceAmountUtils.toFractionalResource(tDesc.resources(GPU)(address)) 
==
+          lowerTaskGpuAmount)
       }
       index += 1
     }
 
-    assert(higherAssignedExecutorsGpus.sorted sameElements
-      ArrayBuffer(("executor0", "0"), ("executor1", "1"), ("executor2", "2"), 
("executor3", "3")
-      ))
-    assert(lowerAssignedExecutorsGpus.sorted sameElements
-      ArrayBuffer(("executor0", "0"), ("executor1", "1"), ("executor2", "2"), 
("executor3", "3")
-      ))
+    assert(higherAssignedExecutorsGpus.sorted ==
+      ArrayBuffer(("executor0", "0"), ("executor1", "1"), ("executor2", "2"), 
("executor3", "3")))
+    assert(lowerAssignedExecutorsGpus.sorted ==
+      ArrayBuffer(("executor0", "0"), ("executor1", "1"), ("executor2", "2"), 
("executor3", "3")))
   }
 
   test("SPARK-45527 TaskResourceProfile: the left multiple gpu resources on 1 
executor " +
@@ -2626,7 +2630,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext
     val executorGpus = 4
     val executorCpus = 4
 
-    // each tasks require 0.3 gpu
+    // each tasks require 0.4 gpu
     val taskScheduler = setupScheduler(numCores = executorCpus,
       config.CPUS_PER_TASK.key -> taskCpus.toString,
       TASK_GPU_ID.amountConf -> taskGpus.toString,
@@ -2636,8 +2640,9 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext
     val lowerTaskSet = FakeTask.createTaskSet(100, 1, 0, 1,
       ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
 
-    // each task require 0.7 gpu
-    val treqs = new TaskResourceRequests().cpus(1).resource(GPU, 0.7)
+    // each task requires 0.7 gpu
+    val higherTaskGpuAmount = 0.7
+    val treqs = new TaskResourceRequests().cpus(1).resource(GPU, 
higherTaskGpuAmount)
     val rp = new TaskResourceProfile(treqs.requests)
     taskScheduler.sc.resourceProfileManager.addResourceProfile(rp)
 
@@ -2654,18 +2659,21 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext
     taskScheduler.submitTasks(lowerTaskSet)
     taskScheduler.submitTasks(higherRpTaskSet)
 
-    // only offer the resources to the higher taskset
+    // Initially, offer the available resources to the higher priority task 
set, where each task
+    //      requires 0.7 GPU, so Spark can assign the GPU resources to a 
maximum of 4 tasks, leaving
+    //      0.3 GPU available for each remaining tasks.
+    // Secondly, try to offer the available resources to the lower priority 
task set, where each
+    //      task requires 0.4 GPU, while only 0.3 gpu of each address is 
available, so couldn't
+    //      offer any resources for the lower tasks.
     val taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten
     assert(4 === taskDescriptions.length)
-    var index = 0
     for (tDesc <- taskDescriptions) {
       assert(tDesc.resources.contains(GPU))
-      val addresses = tDesc.resources.get(GPU).get.keys.toArray.sorted
+      val addresses = tDesc.resources(GPU).keys.toArray.sorted
       assert(addresses.length == 1)
-      assert(addresses(0) == index.toString)
+      assert(addresses(0) == tDesc.index.toString)
       assert(ResourceAmountUtils.toFractionalResource(
-        tDesc.resources.get(GPU).get(index.toString)) == 0.7)
-      index += 1
+        tDesc.resources(GPU)(tDesc.index.toString)) == higherTaskGpuAmount)
     }
   }
 
@@ -2677,9 +2685,9 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext
 
     val numFreeCores = 3
     val workerOffers = IndexedSeq(
-      new WorkerOffer("executor0", "host0", numFreeCores, 
Some("192.168.0.101:49625")),
-      new WorkerOffer("executor1", "host1", numFreeCores, 
Some("192.168.0.101:49627")),
-      new WorkerOffer("executor2", "host2", numFreeCores, 
Some("192.168.0.101:49629")))
+      WorkerOffer("executor0", "host0", numFreeCores, 
Some("192.168.0.101:49625")),
+      WorkerOffer("executor1", "host1", numFreeCores, 
Some("192.168.0.101:49627")),
+      WorkerOffer("executor2", "host2", numFreeCores, 
Some("192.168.0.101:49629")))
     val attempt1 = FakeTask.createBarrierTaskSet(3)
 
     // submit attempt 1, offer some resources, all tasks get launched together


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to