Github user kayousterhout commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16376#discussion_r95435407
  
    --- Diff: 
core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala ---
    @@ -819,4 +819,84 @@ class TaskSchedulerImplSuite extends SparkFunSuite 
with LocalSparkContext with B
         assert(!taskScheduler.hasExecutorsAliveOnHost("host0"))
         assert(taskScheduler.getExecutorsAliveOnHost("host0").isEmpty)
       }
    +
    +  test("Locality should be used for bulk offers even with delay scheduling 
off") {
    +    // for testing, we create a task scheduler which lets us control how 
offers are shuffled
    +    val conf = new SparkConf()
    +      .set("spark.locality.wait", "0")
    +    sc = new SparkContext("local", "TaskSchedulerImplSuite", conf)
    +    // we create a manual clock just so we can be sure the clock doesn't 
advance at all in this test
    +    val clock = new ManualClock()
    +
    +    // We customize the task scheduler just to let us control the way 
offers are shuffled, so we
    +    // can be sure we try both permutations, and to control the clock on 
the tasksetmanager.
    +    val taskScheduler = new TaskSchedulerImpl(sc) {
    +      override def shuffleOffers(offers: IndexedSeq[WorkerOffer]): 
IndexedSeq[WorkerOffer] = {
    +        // Don't shuffle the offers around for this test.  Instead, we'll 
just pass in all
    +        // the permutations we care about directly.
    +        offers
    +      }
    +      override def createTaskSetManager(taskSet: TaskSet, maxTaskFailures: 
Int): TaskSetManager = {
    +        new TaskSetManager(this, taskSet, maxTaskFailures, 
blacklistTrackerOpt, clock)
    +      }
    +    }
    +    // Need to initialize a DAGScheduler for the taskScheduler to use for 
callbacks.
    +    new DAGScheduler(sc, taskScheduler) {
    +      override def taskStarted(task: Task[_], taskInfo: TaskInfo) {}
    +      override def executorAdded(execId: String, host: String) {}
    +    }
    +    taskScheduler.initialize(new FakeSchedulerBackend)
    +
    +
    +    // Make two different offers -- one in the preferred location, one 
that is not.
    +    val offers = IndexedSeq(
    +      WorkerOffer("exec1", "host1", 1),
    +      WorkerOffer("exec2", "host2", 1)
    +    )
    +    Seq(false, true).foreach { swapOrder =>
    +      // Submit a taskset with locality preferences.
    +      val taskSet = FakeTask.createTaskSet(
    +        1, stageId = 1, stageAttemptId = 0, Seq(TaskLocation("host1", 
"exec1")))
    +      taskScheduler.submitTasks(taskSet)
    +      val shuffledOffers = if (swapOrder) offers.reverse else offers
    +      // Regardless of the order of the offers (after the task scheduler 
shuffles them), we should
    +      // always take advantage of the local offer.
    +      val taskDescs = taskScheduler.resourceOffers(shuffledOffers).flatten
    +      withClue(s"swapOrder = $swapOrder") {
    +        assert(taskDescs.size === 1)
    +        assert(taskDescs.head.executorId === "exec1")
    +      }
    +    }
    +  }
    +
    +  test("With delay scheduling off, tasks can be run at any locality level 
immediately") {
    --- End diff --
    
    Sorry last thing: just realized -- does this test need to first submit a 
local resource offer?  That makes sure that the local executor is considered 
alive.  Otherwise, process local won't be in the set of allowed locality levels 
because of the code here: 
https://github.com/apache/spark/pull/16376/files#diff-bad3987c83bd22d46416d3dd9d208e76R966,
 which makes this test somewhat less effective if I understand correctly


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to