bmarcott commented on a change in pull request #27207: [SPARK-18886][CORE] Make
Locality wait time measure resource under utilization due to delay scheduling.
URL: https://github.com/apache/spark/pull/27207#discussion_r389454269
##########
File path:
core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
##########
@@ -195,6 +195,196 @@ class TaskSchedulerImplSuite extends SparkFunSuite with
LocalSparkContext with B
assert(!failedTaskSet)
}
+ def setupTaskScheduler(clock: ManualClock): TaskSchedulerImpl = {
+ val conf = new SparkConf()
+ sc = new SparkContext("local", "TaskSchedulerImplSuite", conf)
+ val taskScheduler = new TaskSchedulerImpl(sc,
+ sc.conf.get(config.TASK_MAX_FAILURES),
+ clock = clock) {
+ override def createTaskSetManager(taskSet: TaskSet, maxTaskFailures:
Int): TaskSetManager = {
+ new TaskSetManager(this, taskSet, maxTaskFailures,
blacklistTrackerOpt, clock)
+ }
+ override def shuffleOffers(offers: IndexedSeq[WorkerOffer]):
IndexedSeq[WorkerOffer] = {
+ // Don't shuffle the offers around for this test. Instead, we'll just
pass in all
+ // the permutations we care about directly.
+ offers
+ }
+ }
+ // Need to initialize a DAGScheduler for the taskScheduler to use for
callbacks.
+ new DAGScheduler(sc, taskScheduler) {
+ override def taskStarted(task: Task[_], taskInfo: TaskInfo): Unit = {}
+
+ override def executorAdded(execId: String, host: String): Unit = {}
+ }
+ taskScheduler.initialize(new FakeSchedulerBackend)
+ val taskSet = FakeTask.createTaskSet(8, 1, 1,
+ Seq(TaskLocation("host1", "exec1")),
+ Seq(TaskLocation("host1", "exec1")),
+ Seq(TaskLocation("host1", "exec1")),
+ Seq(TaskLocation("host1", "exec1")),
+ Seq(TaskLocation("host1", "exec1")),
+ Seq(TaskLocation("host1", "exec1")),
+ Seq(TaskLocation("host1", "exec1")),
+ Seq(TaskLocation("host1", "exec1"))
+ )
+ taskScheduler.resourceOffers(IndexedSeq(WorkerOffer("exec1", "host1", 1)))
+ taskScheduler.submitTasks(taskSet)
+ taskScheduler
+ }
+
+ test("SPARK-18886 - partial offers (isAllFreeResources = false) reset timer
before " +
+ "any resources have been rejected") {
+ val clock = new ManualClock()
+ val taskScheduler = setupTaskScheduler(clock)
+ val advanceAmount = 2000
+
+ // by default, new partial resource (isAllFreeResources = false) offers
reset timer
+ // if the resource is accepted
+ clock.advance(advanceAmount)
+ assert(taskScheduler
+ .resourceOffers(
+ IndexedSeq(WorkerOffer("exec1", "host1", 1)),
+ isAllFreeResources = false)
+ .flatten.length === 1)
Review comment:
locality level starts at PROCESS_LOCAL.
the first resource is process local exec1, host1
the second resource is only node local exec2, host1
all the test cases I added start at PROCESS_LOCAL (the reason why the
resource exec1, host1 was offered before submitting tasks), was that the
confusing part?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]