Github user gatorsmile commented on a diff in the pull request:
https://github.com/apache/spark/pull/15218#discussion_r83562596
--- Diff:
core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala ---
@@ -109,6 +109,72 @@ class TaskSchedulerImplSuite extends SparkFunSuite
with LocalSparkContext with B
assert(!failedTaskSet)
}
+ test("Scheduler balance the assignment to the worker with more free
cores") {
+ val taskScheduler = setupScheduler(("spark.task.assigner",
classOf[BalancedAssigner].getName))
+ val workerOffers = IndexedSeq(new WorkerOffer("executor0", "host0", 2),
+ new WorkerOffer("executor1", "host1", 4))
+ val selectedExecutorIds = {
+ val taskSet = FakeTask.createTaskSet(2)
+ taskScheduler.submitTasks(taskSet)
+ val taskDescriptions =
taskScheduler.resourceOffers(workerOffers).flatten
+ assert(2 === taskDescriptions.length)
+ taskDescriptions.map(_.executorId)
+ }
+ val count = selectedExecutorIds.count(_ == workerOffers(1).executorId)
+ assert(count == 2)
+ assert(!failedTaskSet)
+ }
+
+ test("Scheduler balance the assignment across workers with same free
cores") {
+ val taskScheduler = setupScheduler(("spark.task.assigner",
classOf[BalancedAssigner].getName))
+ val workerOffers = IndexedSeq(new WorkerOffer("executor0", "host0", 2),
+ new WorkerOffer("executor1", "host1", 2))
+ val selectedExecutorIds = {
+ val taskSet = FakeTask.createTaskSet(2)
+ taskScheduler.submitTasks(taskSet)
+ val taskDescriptions =
taskScheduler.resourceOffers(workerOffers).flatten
+ assert(2 === taskDescriptions.length)
+ taskDescriptions.map(_.executorId)
+ }
+ val count = selectedExecutorIds.count(_ == workerOffers(1).executorId)
+ assert(count == 1)
+ assert(!failedTaskSet)
+ }
+
+ test("Scheduler packs the assignment to workers with less free cores") {
+ val taskScheduler = setupScheduler(("spark.task.assigner",
classOf[PackedAssigner].getName))
+ val workerOffers = IndexedSeq(new WorkerOffer("executor0", "host0", 2),
+ new WorkerOffer("executor1", "host1", 4))
+ val selectedExecutorIds = {
+ val taskSet = FakeTask.createTaskSet(2)
+ taskScheduler.submitTasks(taskSet)
+ val taskDescriptions =
taskScheduler.resourceOffers(workerOffers).flatten
+ assert(2 === taskDescriptions.length)
+ taskDescriptions.map(_.executorId)
+ }
+ val count = selectedExecutorIds.count(_ == workerOffers(0).executorId)
+ assert(count == 2)
+ assert(!failedTaskSet)
+ }
+
+ test("Scheduler keeps packing the assignment to the same worker") {
+ val taskScheduler = setupScheduler(("spark.task.assigner",
classOf[PackedAssigner].getName))
+ val workerOffers = IndexedSeq(new WorkerOffer("executor0", "host0", 4),
+ new WorkerOffer("executor1", "host1", 4))
+ val selectedExecutorIds = {
+ val taskSet = FakeTask.createTaskSet(4)
+ taskScheduler.submitTasks(taskSet)
+ val taskDescriptions =
taskScheduler.resourceOffers(workerOffers).flatten
+ assert(4 === taskDescriptions.length)
+ taskDescriptions.map(_.executorId)
+ }
+
+ val count = selectedExecutorIds.count(_ == workerOffers(0).executorId)
+ assert(count == 4)
+ assert(!failedTaskSet)
+ }
+
+
--- End diff --
Nit: remove this empty line.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]