This is an automated email from the ASF dual-hosted git repository. wuyi pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 96b301ce4e41 [SPARK-47498][TESTS][CORE] Refine some GPU fraction calculation tests 96b301ce4e41 is described below commit 96b301ce4e414b575352c431c31412310c7f168b Author: Bobby Wang <wbo4...@gmail.com> AuthorDate: Wed Mar 27 11:47:51 2024 +0800 [SPARK-47498][TESTS][CORE] Refine some GPU fraction calculation tests ### What changes were proposed in this pull request? This PR refines some fractional GPU resource calculation tests. ### Why are the changes needed? This PR adds more comments to the tests and refines some assertation blocks. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? The existing CI passes. ### Was this patch authored or co-authored using generative AI tooling? No Closes #45631 from wbo4958/refine-tests. Authored-by: Bobby Wang <wbo4...@gmail.com> Signed-off-by: Yi Wu <yi...@databricks.com> --- .../scheduler/ExecutorResourcesAmountsSuite.scala | 24 +-- .../spark/scheduler/TaskSchedulerImplSuite.scala | 210 +++++++++++---------- 2 files changed, 121 insertions(+), 113 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/ExecutorResourcesAmountsSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ExecutorResourcesAmountsSuite.scala index 75a772dcdec8..1c5cb041ad6c 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/ExecutorResourcesAmountsSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/ExecutorResourcesAmountsSuite.scala @@ -53,7 +53,7 @@ class ExecutorResourcesAmountsSuite extends SparkFunSuite with ExecutorResourceU // assign nothing to rp without resource profile val assigned = availableExecResAmounts.assignAddressesCustomResources(rp) assert(assigned.isDefined) - assigned.foreach { case resource => assert(resource.isEmpty) } + assigned.foreach(resource => assert(resource.isEmpty)) } test("Convert ExecutorResourceInfos to ExecutorResourcesAmounts") { @@ -187,8 +187,8 @@ class ExecutorResourcesAmountsSuite extends SparkFunSuite with ExecutorResourceU val rp = new ResourceProfileBuilder().require(treqs).build() var assigned = availableExecResAmounts.assignAddressesCustomResources(rp) - assert(!assigned.isEmpty) - assigned.foreach { case resource => assert(!resource.isEmpty)} + assert(assigned.isDefined) + assigned.foreach(resource => assert(resource.nonEmpty)) val treqs1 = new TaskResourceRequests() .resource("gpu", gpuTaskAmount) @@ -270,8 +270,8 @@ class ExecutorResourcesAmountsSuite extends SparkFunSuite with ExecutorResourceU // taskMount = 0.1 < 1.0 which can be assigned. val assigned = availableExecResAmounts.assignAddressesCustomResources(rp) - assert(!assigned.isEmpty) - assigned.foreach { case resource => + assert(assigned.isDefined) + assigned.foreach { resource => assert(resource.size === 1) assert(resource.keys.toSeq === Seq("gpu")) assert(resource("gpu").size === 1) @@ -337,8 +337,8 @@ class ExecutorResourcesAmountsSuite extends SparkFunSuite with ExecutorResourceU // taskMount = 0.1 < 1.0 which can be assigned. val assigned = availableExecResAmounts.assignAddressesCustomResources(rp) - assert(!assigned.isEmpty) - assigned.foreach { case resourceAmounts => + assert(assigned.isDefined) + assigned.foreach { resourceAmounts => assert(resourceAmounts.size === 2) assert(resourceAmounts.keys.toSeq.sorted === Seq("gpu", "fpga").sorted) @@ -402,17 +402,17 @@ class ExecutorResourcesAmountsSuite extends SparkFunSuite with ExecutorResourceU val treqs = new TaskResourceRequests().resource("gpu", taskAmount) val rp = new ResourceProfileBuilder().require(treqs).build() val assigned = availableExecResAmounts.assignAddressesCustomResources(rp) - assert(!assigned.isEmpty) - assigned.foreach { case resources => + assert(assigned.isDefined) + assigned.foreach { resources => assert( - resources("gpu").values.toArray.sorted.map(ResourceAmountUtils.toFractionalResource(_)) + resources("gpu").values.toArray.sorted.map(ResourceAmountUtils.toFractionalResource) === expectedAssignedAmount.sorted) availableExecResAmounts.acquire(resources) val leftRes = availableExecResAmounts.availableResources assert(leftRes.size == 1) - assert(leftRes.keys.toSeq(0) == "gpu") + assert(leftRes.keys.toSeq.head == "gpu") assert(compareMaps(leftRes("gpu"), expectedLeftRes)) } } @@ -424,7 +424,7 @@ class ExecutorResourcesAmountsSuite extends SparkFunSuite with ExecutorResourceU val leftRes = availableExecResAmounts.availableResources assert(leftRes.size == 1) - assert(leftRes.keys.toSeq(0) == "gpu") + assert(leftRes.keys.toSeq.head == "gpu") assert(compareMaps(leftRes("gpu"), expectedLeftRes)) } diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index d9db5f656176..5cc97410bcce 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -2302,13 +2302,10 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext taskDescriptions = taskDescriptions.sortBy(t => t.index) assert(4 * taskNum === taskDescriptions.length) assert(!failedTaskSet) - var gpuAddress = -1 - for (taskId <- 0 until 4 * taskNum) { - if (taskId % taskNum == 0) { - gpuAddress += 1 - } - assert(ArrayBuffer(gpuAddress.toString) === - taskDescriptions(taskId).resources.get(GPU).get.keys.toArray.sorted) + for (task <- taskDescriptions) { + // Spark offers the GPU resources in an ascending order starting from the lowest + // GPU addresses and turns around to the next address until the current one is exhausted. + assert(task.resources(GPU).keys.toArray === Array((task.index / taskNum).toString)) } } } @@ -2331,12 +2328,6 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext EXECUTOR_GPU_ID.amountConf -> executorGpus.toString, config.EXECUTOR_CORES.key -> executorCpus.toString) - val taskSet = if (barrierMode) { - FakeTask.createBarrierTaskSet(4 * taskNum) - } else { - FakeTask.createTaskSet(100) - } - val workerOffers = IndexedSeq( WorkerOffer("executor0", "host0", executorCpus, Some("host0"), @@ -2348,22 +2339,24 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext WorkerOffer("executor3", "host3", executorCpus, Some("host3"), new ExecutorResourcesAmounts(Map(GPU -> toInternalResource(Map("3" -> 1.0)))))) + val executorNumber = workerOffers.length + + val taskSet = if (barrierMode) { + FakeTask.createBarrierTaskSet(executorNumber * taskNum) + } else { + FakeTask.createTaskSet(100) + } + taskScheduler.submitTasks(taskSet) // Launch tasks on executor that satisfies resource requirements - val taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten - assert(4 * taskNum === taskDescriptions.length) + assert(executorNumber * taskNum === taskDescriptions.length) assert(!failedTaskSet) - val assignedGpus: HashMap[String, Int] = HashMap.empty - for (taskId <- 0 until 4 * taskNum) { - val gpus = taskDescriptions(taskId).resources.get(GPU).get.keys.toArray.sorted - assert(gpus.length == 1) - val addr = gpus(0) - if (!assignedGpus.contains(addr)) { - assignedGpus(addr) = 1 - } else { - assignedGpus(addr) += 1 - } + val assignedGpus: mutable.HashMap[String, Int] = mutable.HashMap.empty + taskDescriptions.foreach { task => + val addresses = task.resources(GPU).keys.toArray + assert(addresses.length == 1) + assignedGpus.update(addresses(0), assignedGpus.getOrElseUpdate(addresses(0), 0) + 1) } assert(assignedGpus.toMap === Map("0" -> taskNum, "1" -> taskNum, "2" -> taskNum, "3" -> taskNum)) @@ -2408,13 +2401,10 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext taskDescriptions = taskDescriptions.sortBy(t => t.index) assert(4 * taskNum === taskDescriptions.length) assert(!failedTaskSet) - var gpuAddress = -1 - for (taskId <- 0 until 4 * taskNum) { - if (taskId % taskNum == 0) { - gpuAddress += 1 - } - assert(ArrayBuffer(gpuAddress.toString) === - taskDescriptions(taskId).resources.get(GPU).get.keys.toArray.sorted) + for (task <- taskDescriptions) { + // Spark offers the GPU resources in an ascending order starting from the lowest + // GPU addresses and turns around to the next address until the current one is exhausted. + assert(task.resources(GPU).keys.toArray === Array((task.index / taskNum).toString)) } } } @@ -2439,12 +2429,6 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext val rp = new TaskResourceProfile(treqs.requests) taskScheduler.sc.resourceProfileManager.addResourceProfile(rp) - val taskSet = if (barrierMode) { - FakeTask.createBarrierTaskSet(4 * taskNum, 0, 1, 1, rp.id) - } else { - FakeTask.createTaskSet(100, 0, 1, 1, rp.id) - } - val workerOffers = IndexedSeq( WorkerOffer("executor0", "host0", executorCpus, Some("host1"), @@ -2461,22 +2445,26 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext rp.id) ) + val executorNumber = workerOffers.length + + val taskSet = if (barrierMode) { + FakeTask.createBarrierTaskSet(executorNumber * taskNum, 0, 1, 1, rp.id) + } else { + FakeTask.createTaskSet(100, 0, 1, 1, rp.id) + } + taskScheduler.submitTasks(taskSet) // Launch tasks on executor that satisfies resource requirements val taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten - assert(4 * taskNum === taskDescriptions.length) + assert(executorNumber * taskNum === taskDescriptions.length) assert(!failedTaskSet) - val assignedGpus: HashMap[String, Int] = HashMap.empty - for (taskId <- 0 until 4 * taskNum) { - val gpus = taskDescriptions(taskId).resources.get(GPU).get.keys.toArray.sorted - assert(gpus.length == 1) - val addr = gpus(0) - if (!assignedGpus.contains(addr)) { - assignedGpus(addr) = 1 - } else { - assignedGpus(addr) += 1 - } + + val assignedGpus: mutable.HashMap[String, Int] = mutable.HashMap.empty + taskDescriptions.foreach { task => + val addresses = task.resources(GPU).keys.toArray + assert(addresses.length == 1) + assignedGpus.update(addresses(0), assignedGpus.getOrElseUpdate(addresses(0), 0) + 1) } assert(assignedGpus.toMap === Map("0" -> taskNum, "1" -> taskNum, "2" -> taskNum, "3" -> taskNum)) @@ -2487,22 +2475,23 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext test("SPARK-45527 TaskResourceProfile: the left multiple gpu resources on 1 executor " + "can assign to other taskset") { val taskCpus = 1 - val taskGpus = 0.3 + val lowerGpuTaskAmount = 0.3 val executorGpus = 4 - val executorCpus = 50 + val executorCpus = 20 // each tasks require 0.3 gpu val taskScheduler = setupScheduler(numCores = executorCpus, config.CPUS_PER_TASK.key -> taskCpus.toString, - TASK_GPU_ID.amountConf -> taskGpus.toString, + TASK_GPU_ID.amountConf -> lowerGpuTaskAmount.toString, EXECUTOR_GPU_ID.amountConf -> executorGpus.toString, config.EXECUTOR_CORES.key -> executorCpus.toString ) - val lowerTaskSet = FakeTask.createTaskSet(100, 1, 0, 1, - ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID) + val lowerTaskSet = FakeTask.createTaskSet(100, stageId = 1, stageAttemptId = 0, + priority = 1, ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID) // each task require 0.7 gpu - val treqs = new TaskResourceRequests().cpus(1).resource(GPU, 0.7) + val higherGpuTaskAmount = 0.7 + val treqs = new TaskResourceRequests().cpus(1).resource(GPU, higherGpuTaskAmount) val rp = new TaskResourceProfile(treqs.requests) taskScheduler.sc.resourceProfileManager.addResourceProfile(rp) @@ -2519,22 +2508,26 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext taskScheduler.submitTasks(lowerTaskSet) taskScheduler.submitTasks(higherRpTaskSet) - // should have 3 for default profile and 2 for additional resource profile + // Initially, offer the available resources to the higher priority task set, where each task + // requires 0.7 GPU, so Spark can assign the GPU resources to a maximum of 4 tasks, leaving + // 0.3 GPU available for each remaining tasks. + // Secondly, try to offer the available resources to the lower priority task set, where each + // task requires 0.3 GPU, so the left available GPU resources can be offered to a maximum + // of 4 lower priority tasks. val taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten assert(8 === taskDescriptions.length) var index = 0 for (tDesc <- taskDescriptions) { assert(tDesc.resources.contains(GPU)) - val addresses = tDesc.resources.get(GPU).get.keys.toArray.sorted + val addresses = tDesc.resources(GPU).keys.toArray.sorted assert(addresses.length == 1) + assert(addresses(0) == tDesc.index.toString) if (index < 4) { // the first 4 tasks will grab 0.7 gpu - assert(addresses(0) == index.toString) assert(ResourceAmountUtils.toFractionalResource( - tDesc.resources.get(GPU).get(index.toString)) == 0.7) + tDesc.resources(GPU)(tDesc.index.toString)) == higherGpuTaskAmount) } else { - assert(addresses(0) == (index - 4).toString) assert(ResourceAmountUtils.toFractionalResource( - tDesc.resources.get(GPU).get((index - 4).toString)) == 0.3) + tDesc.resources(GPU)(tDesc.index.toString)) == lowerGpuTaskAmount) } index += 1 } @@ -2543,14 +2536,14 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext test("SPARK-45527 TaskResourceProfile: the left gpu resources on multiple executors " + "can assign to other taskset") { val taskCpus = 1 - val taskGpus = 0.3 + val lowerTaskGpuAmount = 0.3 val executorGpus = 4 - val executorCpus = 50 + val executorCpus = 20 // each tasks require 0.3 gpu val taskScheduler = setupScheduler(numCores = executorCpus, config.CPUS_PER_TASK.key -> taskCpus.toString, - TASK_GPU_ID.amountConf -> taskGpus.toString, + TASK_GPU_ID.amountConf -> lowerTaskGpuAmount.toString, EXECUTOR_GPU_ID.amountConf -> executorGpus.toString, config.EXECUTOR_CORES.key -> executorCpus.toString ) @@ -2558,7 +2551,8 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID) // each task require 0.7 gpu - val treqs = new TaskResourceRequests().cpus(1).resource(GPU, 0.7) + val higherTaskGpuAmount = 0.7 + val treqs = new TaskResourceRequests().cpus(1).resource(GPU, higherTaskGpuAmount) val rp = new TaskResourceProfile(treqs.requests) taskScheduler.sc.resourceProfileManager.addResourceProfile(rp) @@ -2568,21 +2562,36 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext val workerOffers = IndexedSeq( // cpu won't be a problem - WorkerOffer("executor0", "host0", 50, None, new ExecutorResourcesAmounts( + WorkerOffer("executor0", "host0", executorCpus, None, new ExecutorResourcesAmounts( Map(GPU -> toInternalResource(Map("0" -> 1.0))))), - WorkerOffer("executor1", "host1", 50, None, new ExecutorResourcesAmounts( + WorkerOffer("executor1", "host1", executorCpus, None, new ExecutorResourcesAmounts( Map(GPU -> toInternalResource(Map("1" -> 1.0))))), - WorkerOffer("executor2", "host2", 50, None, new ExecutorResourcesAmounts( + WorkerOffer("executor2", "host2", executorCpus, None, new ExecutorResourcesAmounts( Map(GPU -> toInternalResource(Map("2" -> 1.0))))), - WorkerOffer("executor3", "host3", 50, None, new ExecutorResourcesAmounts( + WorkerOffer("executor3", "host3", executorCpus, None, new ExecutorResourcesAmounts( Map(GPU -> toInternalResource(Map("3" -> 1.0))))) ) taskScheduler.submitTasks(lowerTaskSet) taskScheduler.submitTasks(higherRpTaskSet) - // should have 3 for default profile and 2 for additional resource profile - val taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten + // Initially, offer the available resources to the higher priority task set, where each task + // requires 0.7 GPU, so Spark can assign the GPU resources to a maximum of 4 tasks, leaving + // 0.3 GPU available for each remaining tasks. + // Secondly, try to offer the available resources to the lower priority task set, where each + // task requires 0.3 GPU, so the left available GPU resources can be offered to a maximum + // of 4 lower priority tasks. + var taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten + // The original taskDescriptions is like below layout + // Executor 0, executor 1, executor 2, executor 3 + // index 0, index 0, index 1, index 1 index 2, index 2, index 3, index 3 + // task_0.7, task_03 task_0.7, task_03 task_0.7, task_03 task_0.7, task_03 + // After sorting, it should be + // Executor 0, executor 1, executor 2, executor 3 + // index 0 index 1 index 2 index 3 + // task_0.7, task_0.7 task_0.7, task_0.7 + // task_03, task_03, task_03, task_03 + taskDescriptions = taskDescriptions.sortBy(_.taskId) assert(8 === taskDescriptions.length) var index = 0 @@ -2591,32 +2600,27 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext for (tDesc <- taskDescriptions) { assert(tDesc.resources.contains(GPU)) - val addresses = tDesc.resources.get(GPU).get.keys.toArray.sorted + val addresses = tDesc.resources(GPU).keys.toArray.sorted assert(addresses.length == 1) val address = addresses(0) - // Executor 0, executor 1, executor 2, executor 3 - // task_0.7, task_03 task_0.7, task_03 task_0.7, task_03 task_0.7, task_03 - if (index % 2 == 0) { - higherAssignedExecutorsGpus.append( - (tDesc.executorId, address)) - assert(ResourceAmountUtils.toFractionalResource( - tDesc.resources.get(GPU).get(address)) == 0.7) + + if (index / 4 == 0) { + higherAssignedExecutorsGpus.append((tDesc.executorId, address)) + assert(ResourceAmountUtils.toFractionalResource(tDesc.resources(GPU)(address)) == + higherTaskGpuAmount) } else { - lowerAssignedExecutorsGpus.append( - (tDesc.executorId, address)) - assert(ResourceAmountUtils.toFractionalResource( - tDesc.resources.get(GPU).get(address)) == 0.3) + lowerAssignedExecutorsGpus.append((tDesc.executorId, address)) + assert(ResourceAmountUtils.toFractionalResource(tDesc.resources(GPU)(address)) == + lowerTaskGpuAmount) } index += 1 } - assert(higherAssignedExecutorsGpus.sorted sameElements - ArrayBuffer(("executor0", "0"), ("executor1", "1"), ("executor2", "2"), ("executor3", "3") - )) - assert(lowerAssignedExecutorsGpus.sorted sameElements - ArrayBuffer(("executor0", "0"), ("executor1", "1"), ("executor2", "2"), ("executor3", "3") - )) + assert(higherAssignedExecutorsGpus.sorted == + ArrayBuffer(("executor0", "0"), ("executor1", "1"), ("executor2", "2"), ("executor3", "3"))) + assert(lowerAssignedExecutorsGpus.sorted == + ArrayBuffer(("executor0", "0"), ("executor1", "1"), ("executor2", "2"), ("executor3", "3"))) } test("SPARK-45527 TaskResourceProfile: the left multiple gpu resources on 1 executor " + @@ -2626,7 +2630,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext val executorGpus = 4 val executorCpus = 4 - // each tasks require 0.3 gpu + // each tasks require 0.4 gpu val taskScheduler = setupScheduler(numCores = executorCpus, config.CPUS_PER_TASK.key -> taskCpus.toString, TASK_GPU_ID.amountConf -> taskGpus.toString, @@ -2636,8 +2640,9 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext val lowerTaskSet = FakeTask.createTaskSet(100, 1, 0, 1, ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID) - // each task require 0.7 gpu - val treqs = new TaskResourceRequests().cpus(1).resource(GPU, 0.7) + // each task requires 0.7 gpu + val higherTaskGpuAmount = 0.7 + val treqs = new TaskResourceRequests().cpus(1).resource(GPU, higherTaskGpuAmount) val rp = new TaskResourceProfile(treqs.requests) taskScheduler.sc.resourceProfileManager.addResourceProfile(rp) @@ -2654,18 +2659,21 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext taskScheduler.submitTasks(lowerTaskSet) taskScheduler.submitTasks(higherRpTaskSet) - // only offer the resources to the higher taskset + // Initially, offer the available resources to the higher priority task set, where each task + // requires 0.7 GPU, so Spark can assign the GPU resources to a maximum of 4 tasks, leaving + // 0.3 GPU available for each remaining tasks. + // Secondly, try to offer the available resources to the lower priority task set, where each + // task requires 0.4 GPU, while only 0.3 gpu of each address is available, so couldn't + // offer any resources for the lower tasks. val taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten assert(4 === taskDescriptions.length) - var index = 0 for (tDesc <- taskDescriptions) { assert(tDesc.resources.contains(GPU)) - val addresses = tDesc.resources.get(GPU).get.keys.toArray.sorted + val addresses = tDesc.resources(GPU).keys.toArray.sorted assert(addresses.length == 1) - assert(addresses(0) == index.toString) + assert(addresses(0) == tDesc.index.toString) assert(ResourceAmountUtils.toFractionalResource( - tDesc.resources.get(GPU).get(index.toString)) == 0.7) - index += 1 + tDesc.resources(GPU)(tDesc.index.toString)) == higherTaskGpuAmount) } } @@ -2677,9 +2685,9 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext val numFreeCores = 3 val workerOffers = IndexedSeq( - new WorkerOffer("executor0", "host0", numFreeCores, Some("192.168.0.101:49625")), - new WorkerOffer("executor1", "host1", numFreeCores, Some("192.168.0.101:49627")), - new WorkerOffer("executor2", "host2", numFreeCores, Some("192.168.0.101:49629"))) + WorkerOffer("executor0", "host0", numFreeCores, Some("192.168.0.101:49625")), + WorkerOffer("executor1", "host1", numFreeCores, Some("192.168.0.101:49627")), + WorkerOffer("executor2", "host2", numFreeCores, Some("192.168.0.101:49629"))) val attempt1 = FakeTask.createBarrierTaskSet(3) // submit attempt 1, offer some resources, all tasks get launched together --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org