[GitHub] [spark] bmarcott commented on a change in pull request #27207: [SPARK-18886][CORE] Make Locality wait time measure resource under utilization due to delay scheduling.

2020-05-02 Thread GitBox


bmarcott commented on a change in pull request #27207:
URL: https://github.com/apache/spark/pull/27207#discussion_r418921358



##
File path: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
##
@@ -466,12 +503,28 @@ private[spark] class TaskSchedulerImpl(
 }.sum
   }
 
+  private def minTaskLocality(
+  l1: Option[TaskLocality],
+  l2: Option[TaskLocality]) : Option[TaskLocality] = {
+if (l1.isEmpty) {
+  l2
+} else if (l2.isEmpty) {
+  l1
+} else if (l1.get < l2.get) {
+  l1
+} else {
+  l2
+}
+  }
+
   /**
* Called by cluster manager to offer resources on slaves. We respond by 
asking our active task
* sets for tasks in order of priority. We fill each node with tasks in a 
round-robin manner so
* that tasks are balanced across the cluster.
*/
-  def resourceOffers(offers: IndexedSeq[WorkerOffer]): 
Seq[Seq[TaskDescription]] = synchronized {
+  def resourceOffers(
+  offers: IndexedSeq[WorkerOffer],
+  isAllFreeResources: Boolean = true): Seq[Seq[TaskDescription]] = 
synchronized {

Review comment:
   I think I originally did this to not break the api + maintain something 
closer to previous behavior for callers who hadn't migrated to setting it to 
false.
   Lemme know if this is the wrong approach.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] bmarcott commented on a change in pull request #27207: [SPARK-18886][CORE] Make Locality wait time measure resource under utilization due to delay scheduling.

2020-05-02 Thread GitBox


bmarcott commented on a change in pull request #27207:
URL: https://github.com/apache/spark/pull/27207#discussion_r418921110



##
File path: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
##
@@ -466,12 +503,28 @@ private[spark] class TaskSchedulerImpl(
 }.sum
   }
 
+  private def minTaskLocality(
+  l1: Option[TaskLocality],
+  l2: Option[TaskLocality]) : Option[TaskLocality] = {
+if (l1.isEmpty) {
+  l2
+} else if (l2.isEmpty) {
+  l1
+} else if (l1.get < l2.get) {
+  l1
+} else {
+  l2
+}
+  }
+
   /**
* Called by cluster manager to offer resources on slaves. We respond by 
asking our active task
* sets for tasks in order of priority. We fill each node with tasks in a 
round-robin manner so
* that tasks are balanced across the cluster.

Review comment:
   yes I can add something like: if true, then the parameter offers 
contains all workers and their free resources. See delay scheduling comments in 
class description.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] bmarcott commented on a change in pull request #27207: [SPARK-18886][CORE] Make Locality wait time measure resource under utilization due to delay scheduling.

2020-04-23 Thread GitBox


bmarcott commented on a change in pull request #27207:
URL: https://github.com/apache/spark/pull/27207#discussion_r413543040



##
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##
@@ -543,6 +543,16 @@ package object config {
   .version("1.2.0")
   .fallbackConf(DYN_ALLOCATION_SCHEDULER_BACKLOG_TIMEOUT)
 
+  private[spark] val LEGACY_LOCALITY_WAIT_RESET =
+ConfigBuilder("spark.locality.wait.legacyResetOnTaskLaunch")
+.doc("Whether to use the legacy behavior of locality wait, which resets 
the delay timer " +
+  "anytime a task is scheduled. See Delay Scheduling section of 
TaskSchedulerImpl's class " +
+  "documentation for more details.")
+.internal()
+.version("3.0.0")

Review comment:
   afaik that is correct we didn't backport it.
   here is PR fixing these: https://github.com/apache/spark/pull/28307





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] bmarcott commented on a change in pull request #27207: [SPARK-18886][CORE] Make Locality wait time measure resource under utilization due to delay scheduling.

2020-04-23 Thread GitBox


bmarcott commented on a change in pull request #27207:
URL: https://github.com/apache/spark/pull/27207#discussion_r413542822



##
File path: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
##
@@ -319,20 +336,38 @@ private[spark] class TaskSchedulerImpl(
 taskSetsByStageIdAndAttempt -= manager.taskSet.stageId
   }
 }
+resetOnPreviousOffer -= manager.taskSet
 manager.parent.removeSchedulable(manager)
 logInfo(s"Removed TaskSet ${manager.taskSet.id}, whose tasks have all 
completed, from pool" +
   s" ${manager.parent.name}")
   }
 
+  /**
+   * Offers resources to a single [[TaskSetManager]] at a given max allowed 
[[TaskLocality]].
+   *
+   * @param taskSet task set manager to offer resources to
+   * @param maxLocality max locality to allow when scheduling
+   * @param shuffledOffers shuffled resource offers to use for scheduling,
+   *   remaining resources are tracked by below fields as 
tasks are scheduled
+   * @param availableCpus  remaining cpus per offer,
+   *   value at index 'i' corresponds to shuffledOffers[i]
+   * @param availableResources remaining resources per offer,
+   *   value at index 'i' corresponds to 
shuffledOffers[i]
+   * @param tasks tasks scheduled per offer, value at index 'i' corresponds to 
shuffledOffers[i]
+   * @param addressesWithDescs tasks scheduler per host:port, used for barrier 
tasks
+   * @return tuple of (had delay schedule rejects?, option of min locality of 
launched task)

Review comment:
   thanks for catching. The code changed mid PR and missed updating the doc.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] bmarcott commented on a change in pull request #27207: [SPARK-18886][CORE] Make Locality wait time measure resource under utilization due to delay scheduling.

2020-04-02 Thread GitBox
bmarcott commented on a change in pull request #27207: [SPARK-18886][CORE] Make 
Locality wait time measure resource under utilization due to delay scheduling.
URL: https://github.com/apache/spark/pull/27207#discussion_r402738705
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
 ##
 @@ -319,20 +336,38 @@ private[spark] class TaskSchedulerImpl(
 taskSetsByStageIdAndAttempt -= manager.taskSet.stageId
   }
 }
+resetOnPreviousOffer -= manager.taskSet
 manager.parent.removeSchedulable(manager)
 logInfo(s"Removed TaskSet ${manager.taskSet.id}, whose tasks have all 
completed, from pool" +
   s" ${manager.parent.name}")
   }
 
+  /**
+   * Offers resources to a single [[TaskSetManager]] at a given max allowed 
[[TaskLocality]].
+   *
+   * @param taskSet task set to offer resources to
 
 Review comment:
   would you prefer I just change the doc, or the variable name as well?
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] bmarcott commented on a change in pull request #27207: [SPARK-18886][CORE] Make Locality wait time measure resource under utilization due to delay scheduling.

2020-04-02 Thread GitBox
bmarcott commented on a change in pull request #27207: [SPARK-18886][CORE] Make 
Locality wait time measure resource under utilization due to delay scheduling.
URL: https://github.com/apache/spark/pull/27207#discussion_r402739821
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
 ##
 @@ -466,12 +503,28 @@ private[spark] class TaskSchedulerImpl(
 }.sum
   }
 
+  def minTaskLocality(
 
 Review comment:
    


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] bmarcott commented on a change in pull request #27207: [SPARK-18886][CORE] Make Locality wait time measure resource under utilization due to delay scheduling.

2020-04-02 Thread GitBox
bmarcott commented on a change in pull request #27207: [SPARK-18886][CORE] Make 
Locality wait time measure resource under utilization due to delay scheduling.
URL: https://github.com/apache/spark/pull/27207#discussion_r402739744
 
 

 ##
 File path: 
core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
 ##
 @@ -196,6 +196,241 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext with B
 assert(!failedTaskSet)
   }
 
+  def setupTaskScheduler(clock: ManualClock): TaskSchedulerImpl = {
+val conf = new SparkConf()
+sc = new SparkContext("local", "TaskSchedulerImplSuite", conf)
+val taskScheduler = new TaskSchedulerImpl(sc,
+  sc.conf.get(config.TASK_MAX_FAILURES),
+  clock = clock) {
+  override def createTaskSetManager(taskSet: TaskSet, maxTaskFailures: 
Int): TaskSetManager = {
+new TaskSetManager(this, taskSet, maxTaskFailures, 
blacklistTrackerOpt, clock)
+  }
+  override def shuffleOffers(offers: IndexedSeq[WorkerOffer]): 
IndexedSeq[WorkerOffer] = {
+// Don't shuffle the offers around for this test.  Instead, we'll just 
pass in all
+// the permutations we care about directly.
+offers
+  }
+}
+// Need to initialize a DAGScheduler for the taskScheduler to use for 
callbacks.
+new DAGScheduler(sc, taskScheduler) {
+  override def taskStarted(task: Task[_], taskInfo: TaskInfo): Unit = {}
+
+  override def executorAdded(execId: String, host: String): Unit = {}
+}
+taskScheduler.initialize(new FakeSchedulerBackend)
+val taskSet = FakeTask.createTaskSet(8, 1, 1,
+  Seq(TaskLocation("host1", "exec1")),
+  Seq(TaskLocation("host1", "exec1")),
+  Seq(TaskLocation("host1", "exec1")),
+  Seq(TaskLocation("host1", "exec1")),
+  Seq(TaskLocation("host1", "exec1")),
+  Seq(TaskLocation("host1", "exec1")),
+  Seq(TaskLocation("host1", "exec1")),
+  Seq(TaskLocation("host1", "exec1"))
+)
+
+// Offer resources first so that when the taskset is submitted it can 
initialize
+// with proper locality level. Otherwise, ANY would be the only locality 
level.
+// See TaskSetManager.computeValidLocalityLevels()
+// This begins the task set as PROCESS_LOCAL locality level
+taskScheduler.resourceOffers(IndexedSeq(WorkerOffer("exec1", "host1", 1)))
+taskScheduler.submitTasks(taskSet)
+taskScheduler
+  }
+
+  test("SPARK-18886 - partial offers (isAllFreeResources = false) reset timer 
before " +
+"any resources have been rejected") {
+val clock = new ManualClock()
+// All tasks created here are local to exec1, host1.
+// Locality level starts at PROCESS_LOCAL.
+val taskScheduler = setupTaskScheduler(clock)
+// Locality levels increase at 3000 ms.
+val advanceAmount = 3000
+
+// Advancing clock increases locality level to NODE_LOCAL.
+clock.advance(advanceAmount)
+
+// If there hasn't yet been any full resource offers,
+// partial resource (isAllFreeResources = false) offers reset delay 
scheduling
+// if this and previous offers were accepted.
+// This line resets the timer and locality level is reset to PROCESS_LOCAL.
+assert(taskScheduler
+  .resourceOffers(
+IndexedSeq(WorkerOffer("exec1", "host1", 1)),
+isAllFreeResources = false)
+  .flatten.length === 1)
+
+// This NODE_LOCAL task should not be accepted.
+assert(taskScheduler
+  .resourceOffers(
+IndexedSeq(WorkerOffer("exec2", "host1", 1)),
+isAllFreeResources = false)
+  .flatten.isEmpty)
+  }
+
+  test("SPARK-18886 - delay scheduling timer is reset when it accepts all 
resources offered when" +
+"isAllFreeResources = true") {
+val clock = new ManualClock()
+// All tasks created here are local to exec1, host1.
+// Locality level starts at PROCESS_LOCAL.
+val taskScheduler = setupTaskScheduler(clock)
+// Locality levels increase at 3000 ms.
+val advanceAmount = 3000
+
+// Advancing clock increases locality level to NODE_LOCAL.
+clock.advance(advanceAmount)
+
+// If there are no rejects on an all resource offer, delay scheduling is 
reset.
+// This line resets the timer and locality level is reset to PROCESS_LOCAL.
+assert(taskScheduler
+  .resourceOffers(
+IndexedSeq(WorkerOffer("exec1", "host1", 1)),
+isAllFreeResources = true)
+  .flatten.length === 1)
+
+// This NODE_LOCAL task should not be accepted.
+assert(taskScheduler
+  .resourceOffers(
+IndexedSeq(WorkerOffer("exec2", "host1", 1)),
+isAllFreeResources = false)
+  .flatten.isEmpty)
+  }
+
+  test("SPARK-18886 - partial resource offers (isAllFreeResources = false) 
reset " +
+"time if last full resource offer (isAllResources = true) was accepted as 
well as any " +
+"following partial resource offers") {
+val clock = new ManualClock()
+// All tasks created here 

[GitHub] [spark] bmarcott commented on a change in pull request #27207: [SPARK-18886][CORE] Make Locality wait time measure resource under utilization due to delay scheduling.

2020-04-02 Thread GitBox
bmarcott commented on a change in pull request #27207: [SPARK-18886][CORE] Make 
Locality wait time measure resource under utilization due to delay scheduling.
URL: https://github.com/apache/spark/pull/27207#discussion_r402739751
 
 

 ##
 File path: 
core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
 ##
 @@ -196,6 +196,241 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext with B
 assert(!failedTaskSet)
   }
 
+  def setupTaskScheduler(clock: ManualClock): TaskSchedulerImpl = {
+val conf = new SparkConf()
+sc = new SparkContext("local", "TaskSchedulerImplSuite", conf)
+val taskScheduler = new TaskSchedulerImpl(sc,
+  sc.conf.get(config.TASK_MAX_FAILURES),
+  clock = clock) {
+  override def createTaskSetManager(taskSet: TaskSet, maxTaskFailures: 
Int): TaskSetManager = {
+new TaskSetManager(this, taskSet, maxTaskFailures, 
blacklistTrackerOpt, clock)
+  }
+  override def shuffleOffers(offers: IndexedSeq[WorkerOffer]): 
IndexedSeq[WorkerOffer] = {
+// Don't shuffle the offers around for this test.  Instead, we'll just 
pass in all
+// the permutations we care about directly.
+offers
+  }
+}
+// Need to initialize a DAGScheduler for the taskScheduler to use for 
callbacks.
+new DAGScheduler(sc, taskScheduler) {
+  override def taskStarted(task: Task[_], taskInfo: TaskInfo): Unit = {}
+
+  override def executorAdded(execId: String, host: String): Unit = {}
+}
+taskScheduler.initialize(new FakeSchedulerBackend)
+val taskSet = FakeTask.createTaskSet(8, 1, 1,
+  Seq(TaskLocation("host1", "exec1")),
+  Seq(TaskLocation("host1", "exec1")),
+  Seq(TaskLocation("host1", "exec1")),
+  Seq(TaskLocation("host1", "exec1")),
+  Seq(TaskLocation("host1", "exec1")),
+  Seq(TaskLocation("host1", "exec1")),
+  Seq(TaskLocation("host1", "exec1")),
+  Seq(TaskLocation("host1", "exec1"))
+)
+
+// Offer resources first so that when the taskset is submitted it can 
initialize
+// with proper locality level. Otherwise, ANY would be the only locality 
level.
+// See TaskSetManager.computeValidLocalityLevels()
+// This begins the task set as PROCESS_LOCAL locality level
+taskScheduler.resourceOffers(IndexedSeq(WorkerOffer("exec1", "host1", 1)))
+taskScheduler.submitTasks(taskSet)
+taskScheduler
+  }
+
+  test("SPARK-18886 - partial offers (isAllFreeResources = false) reset timer 
before " +
+"any resources have been rejected") {
+val clock = new ManualClock()
+// All tasks created here are local to exec1, host1.
+// Locality level starts at PROCESS_LOCAL.
+val taskScheduler = setupTaskScheduler(clock)
+// Locality levels increase at 3000 ms.
+val advanceAmount = 3000
+
+// Advancing clock increases locality level to NODE_LOCAL.
+clock.advance(advanceAmount)
+
+// If there hasn't yet been any full resource offers,
+// partial resource (isAllFreeResources = false) offers reset delay 
scheduling
+// if this and previous offers were accepted.
+// This line resets the timer and locality level is reset to PROCESS_LOCAL.
+assert(taskScheduler
+  .resourceOffers(
+IndexedSeq(WorkerOffer("exec1", "host1", 1)),
+isAllFreeResources = false)
+  .flatten.length === 1)
+
+// This NODE_LOCAL task should not be accepted.
+assert(taskScheduler
+  .resourceOffers(
+IndexedSeq(WorkerOffer("exec2", "host1", 1)),
+isAllFreeResources = false)
+  .flatten.isEmpty)
+  }
+
+  test("SPARK-18886 - delay scheduling timer is reset when it accepts all 
resources offered when" +
+"isAllFreeResources = true") {
 
 Review comment:
   good  


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] bmarcott commented on a change in pull request #27207: [SPARK-18886][CORE] Make Locality wait time measure resource under utilization due to delay scheduling.

2020-04-02 Thread GitBox
bmarcott commented on a change in pull request #27207: [SPARK-18886][CORE] Make 
Locality wait time measure resource under utilization due to delay scheduling.
URL: https://github.com/apache/spark/pull/27207#discussion_r402738705
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
 ##
 @@ -319,20 +336,38 @@ private[spark] class TaskSchedulerImpl(
 taskSetsByStageIdAndAttempt -= manager.taskSet.stageId
   }
 }
+resetOnPreviousOffer -= manager.taskSet
 manager.parent.removeSchedulable(manager)
 logInfo(s"Removed TaskSet ${manager.taskSet.id}, whose tasks have all 
completed, from pool" +
   s" ${manager.parent.name}")
   }
 
+  /**
+   * Offers resources to a single [[TaskSetManager]] at a given max allowed 
[[TaskLocality]].
+   *
+   * @param taskSet task set to offer resources to
 
 Review comment:
   would you prefer I just change the doc, or the variable name as well?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] bmarcott commented on a change in pull request #27207: [SPARK-18886][CORE] Make Locality wait time measure resource under utilization due to delay scheduling.

2020-03-30 Thread GitBox
bmarcott commented on a change in pull request #27207: [SPARK-18886][CORE] Make 
Locality wait time measure resource under utilization due to delay scheduling.
URL: https://github.com/apache/spark/pull/27207#discussion_r400606676
 
 

 ##
 File path: 
core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
 ##
 @@ -901,18 +1136,17 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext with B
 }
 
 // Here is the main check of this test -- we have the same offers again, 
and we schedule it
-// successfully.  Because the scheduler first tries to schedule with 
locality in mind, at first
-// it won't schedule anything on executor1.  But despite that, we don't 
abort the job.  Then the
-// scheduler tries for ANY locality, and successfully schedules tasks on 
executor1.
+// successfully.  Because the scheduler tries to schedule with locality in 
mind, at first
+// it won't schedule anything on executor1.  But despite that, we don't 
abort the job.
 val secondTaskAttempts = taskScheduler.resourceOffers(offers).flatten
-assert(secondTaskAttempts.size == 2)
-secondTaskAttempts.foreach { taskAttempt => assert("executor1" === 
taskAttempt.executorId) }
+assert(secondTaskAttempts.isEmpty)
 assert(!failedTaskSet)
   }
 
   test("SPARK-16106 locality levels updated if executor added to existing 
host") {
 val taskScheduler = setupScheduler()
 
+taskScheduler.resourceOffers(IndexedSeq(new WorkerOffer("executor0", 
"host0", 1)))
 
 Review comment:
   we don't need it, just otherwise the test behaves differently because the 
resources aren't scheduled the same (more resources are accepted up front with 
new code)
   I can also make the test pass by setting the legacy flag, or changing more 
logic in the test
   
   Previously the locality level would be reset on every task launch, now it is 
once per resourceOffers call (with certain conditions met).
   Workloads that relied on the old behavior would possibly regress.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] bmarcott commented on a change in pull request #27207: [SPARK-18886][CORE] Make Locality wait time measure resource under utilization due to delay scheduling.

2020-03-30 Thread GitBox
bmarcott commented on a change in pull request #27207: [SPARK-18886][CORE] Make 
Locality wait time measure resource under utilization due to delay scheduling.
URL: https://github.com/apache/spark/pull/27207#discussion_r400606676
 
 

 ##
 File path: 
core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
 ##
 @@ -901,18 +1136,17 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext with B
 }
 
 // Here is the main check of this test -- we have the same offers again, 
and we schedule it
-// successfully.  Because the scheduler first tries to schedule with 
locality in mind, at first
-// it won't schedule anything on executor1.  But despite that, we don't 
abort the job.  Then the
-// scheduler tries for ANY locality, and successfully schedules tasks on 
executor1.
+// successfully.  Because the scheduler tries to schedule with locality in 
mind, at first
+// it won't schedule anything on executor1.  But despite that, we don't 
abort the job.
 val secondTaskAttempts = taskScheduler.resourceOffers(offers).flatten
-assert(secondTaskAttempts.size == 2)
-secondTaskAttempts.foreach { taskAttempt => assert("executor1" === 
taskAttempt.executorId) }
+assert(secondTaskAttempts.isEmpty)
 assert(!failedTaskSet)
   }
 
   test("SPARK-16106 locality levels updated if executor added to existing 
host") {
 val taskScheduler = setupScheduler()
 
+taskScheduler.resourceOffers(IndexedSeq(new WorkerOffer("executor0", 
"host0", 1)))
 
 Review comment:
   we don't need it, just otherwise the test behaves differently because the 
resources aren't scheduled the same (more resources are accepted up front with 
new code)
   I can also make the test pass by setting the legacy flag, or changing more 
logic in the test
   
   Previously the locality level would be reset on every task launch, now it is 
once per resourceOffers call (with certain conditions are met).
   Workloads that relied on the old behavior would possibly regress.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] bmarcott commented on a change in pull request #27207: [SPARK-18886][CORE] Make Locality wait time measure resource under utilization due to delay scheduling.

2020-03-30 Thread GitBox
bmarcott commented on a change in pull request #27207: [SPARK-18886][CORE] Make 
Locality wait time measure resource under utilization due to delay scheduling.
URL: https://github.com/apache/spark/pull/27207#discussion_r400606676
 
 

 ##
 File path: 
core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
 ##
 @@ -901,18 +1136,17 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext with B
 }
 
 // Here is the main check of this test -- we have the same offers again, 
and we schedule it
-// successfully.  Because the scheduler first tries to schedule with 
locality in mind, at first
-// it won't schedule anything on executor1.  But despite that, we don't 
abort the job.  Then the
-// scheduler tries for ANY locality, and successfully schedules tasks on 
executor1.
+// successfully.  Because the scheduler tries to schedule with locality in 
mind, at first
+// it won't schedule anything on executor1.  But despite that, we don't 
abort the job.
 val secondTaskAttempts = taskScheduler.resourceOffers(offers).flatten
-assert(secondTaskAttempts.size == 2)
-secondTaskAttempts.foreach { taskAttempt => assert("executor1" === 
taskAttempt.executorId) }
+assert(secondTaskAttempts.isEmpty)
 assert(!failedTaskSet)
   }
 
   test("SPARK-16106 locality levels updated if executor added to existing 
host") {
 val taskScheduler = setupScheduler()
 
+taskScheduler.resourceOffers(IndexedSeq(new WorkerOffer("executor0", 
"host0", 1)))
 
 Review comment:
   we don't need it, just otherwise the test behaves differently because the 
resources aren't scheduled the same (more resources are accepted up front with 
new code)
   I can also make the test pass by setting the legacy flag, or changing more 
logic in the test
   
   Previously the locality level would be reset on every task launch, now it is 
once per resourceOffers call.
   Workloads that relied on the old behavior would possibly regress.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] bmarcott commented on a change in pull request #27207: [SPARK-18886][CORE] Make Locality wait time measure resource under utilization due to delay scheduling.

2020-03-30 Thread GitBox
bmarcott commented on a change in pull request #27207: [SPARK-18886][CORE] Make 
Locality wait time measure resource under utilization due to delay scheduling.
URL: https://github.com/apache/spark/pull/27207#discussion_r400606676
 
 

 ##
 File path: 
core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
 ##
 @@ -901,18 +1136,17 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext with B
 }
 
 // Here is the main check of this test -- we have the same offers again, 
and we schedule it
-// successfully.  Because the scheduler first tries to schedule with 
locality in mind, at first
-// it won't schedule anything on executor1.  But despite that, we don't 
abort the job.  Then the
-// scheduler tries for ANY locality, and successfully schedules tasks on 
executor1.
+// successfully.  Because the scheduler tries to schedule with locality in 
mind, at first
+// it won't schedule anything on executor1.  But despite that, we don't 
abort the job.
 val secondTaskAttempts = taskScheduler.resourceOffers(offers).flatten
-assert(secondTaskAttempts.size == 2)
-secondTaskAttempts.foreach { taskAttempt => assert("executor1" === 
taskAttempt.executorId) }
+assert(secondTaskAttempts.isEmpty)
 assert(!failedTaskSet)
   }
 
   test("SPARK-16106 locality levels updated if executor added to existing 
host") {
 val taskScheduler = setupScheduler()
 
+taskScheduler.resourceOffers(IndexedSeq(new WorkerOffer("executor0", 
"host0", 1)))
 
 Review comment:
   we don't need it, just otherwise the test behaves differently because the 
resources aren't scheduled the same (more resources are accepted up front with 
new code)
   I can also make the test pass by setting the legacy flag, or changing more 
logic in the test
   
   Previously the locality level would be reset on every task launch, now it is 
once per resourceOffers call.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] bmarcott commented on a change in pull request #27207: [SPARK-18886][CORE] Make Locality wait time measure resource under utilization due to delay scheduling.

2020-03-30 Thread GitBox
bmarcott commented on a change in pull request #27207: [SPARK-18886][CORE] Make 
Locality wait time measure resource under utilization due to delay scheduling.
URL: https://github.com/apache/spark/pull/27207#discussion_r400606676
 
 

 ##
 File path: 
core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
 ##
 @@ -901,18 +1136,17 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext with B
 }
 
 // Here is the main check of this test -- we have the same offers again, 
and we schedule it
-// successfully.  Because the scheduler first tries to schedule with 
locality in mind, at first
-// it won't schedule anything on executor1.  But despite that, we don't 
abort the job.  Then the
-// scheduler tries for ANY locality, and successfully schedules tasks on 
executor1.
+// successfully.  Because the scheduler tries to schedule with locality in 
mind, at first
+// it won't schedule anything on executor1.  But despite that, we don't 
abort the job.
 val secondTaskAttempts = taskScheduler.resourceOffers(offers).flatten
-assert(secondTaskAttempts.size == 2)
-secondTaskAttempts.foreach { taskAttempt => assert("executor1" === 
taskAttempt.executorId) }
+assert(secondTaskAttempts.isEmpty)
 assert(!failedTaskSet)
   }
 
   test("SPARK-16106 locality levels updated if executor added to existing 
host") {
 val taskScheduler = setupScheduler()
 
+taskScheduler.resourceOffers(IndexedSeq(new WorkerOffer("executor0", 
"host0", 1)))
 
 Review comment:
   we don't need it, just otherwise the test behaves differently because the 
resources aren't scheduled the same (more resources are accepted up front with 
new code)
   I can also make the test pass by setting the legacy flag, or changing more 
logic in the test


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] bmarcott commented on a change in pull request #27207: [SPARK-18886][CORE] Make Locality wait time measure resource under utilization due to delay scheduling.

2020-03-30 Thread GitBox
bmarcott commented on a change in pull request #27207: [SPARK-18886][CORE] Make 
Locality wait time measure resource under utilization due to delay scheduling.
URL: https://github.com/apache/spark/pull/27207#discussion_r400606153
 
 

 ##
 File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
 ##
 @@ -543,6 +543,11 @@ package object config {
   .version("1.2.0")
   .fallbackConf(DYN_ALLOCATION_SCHEDULER_BACKLOG_TIMEOUT)
 
+  private[spark] val LEGACY_LOCALITY_WAIT_RESET =
+ConfigBuilder("spark.locality.wait.legacyResetOnTaskLaunch")
 
 Review comment:
   what all does adding internal do? I see internal ones are not exposed when 
SQLConf.getAllDefinedConfs is called.
   Which version should I add?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] bmarcott commented on a change in pull request #27207: [SPARK-18886][CORE] Make Locality wait time measure resource under utilization due to delay scheduling.

2020-03-08 Thread GitBox
bmarcott commented on a change in pull request #27207: [SPARK-18886][CORE] Make 
Locality wait time measure resource under utilization due to delay scheduling.
URL: https://github.com/apache/spark/pull/27207#discussion_r389454269
 
 

 ##
 File path: 
core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
 ##
 @@ -195,6 +195,196 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext with B
 assert(!failedTaskSet)
   }
 
+  def setupTaskScheduler(clock: ManualClock): TaskSchedulerImpl = {
+val conf = new SparkConf()
+sc = new SparkContext("local", "TaskSchedulerImplSuite", conf)
+val taskScheduler = new TaskSchedulerImpl(sc,
+  sc.conf.get(config.TASK_MAX_FAILURES),
+  clock = clock) {
+  override def createTaskSetManager(taskSet: TaskSet, maxTaskFailures: 
Int): TaskSetManager = {
+new TaskSetManager(this, taskSet, maxTaskFailures, 
blacklistTrackerOpt, clock)
+  }
+  override def shuffleOffers(offers: IndexedSeq[WorkerOffer]): 
IndexedSeq[WorkerOffer] = {
+// Don't shuffle the offers around for this test.  Instead, we'll just 
pass in all
+// the permutations we care about directly.
+offers
+  }
+}
+// Need to initialize a DAGScheduler for the taskScheduler to use for 
callbacks.
+new DAGScheduler(sc, taskScheduler) {
+  override def taskStarted(task: Task[_], taskInfo: TaskInfo): Unit = {}
+
+  override def executorAdded(execId: String, host: String): Unit = {}
+}
+taskScheduler.initialize(new FakeSchedulerBackend)
+val taskSet = FakeTask.createTaskSet(8, 1, 1,
+  Seq(TaskLocation("host1", "exec1")),
+  Seq(TaskLocation("host1", "exec1")),
+  Seq(TaskLocation("host1", "exec1")),
+  Seq(TaskLocation("host1", "exec1")),
+  Seq(TaskLocation("host1", "exec1")),
+  Seq(TaskLocation("host1", "exec1")),
+  Seq(TaskLocation("host1", "exec1")),
+  Seq(TaskLocation("host1", "exec1"))
+)
+taskScheduler.resourceOffers(IndexedSeq(WorkerOffer("exec1", "host1", 1)))
+taskScheduler.submitTasks(taskSet)
+taskScheduler
+  }
+
+  test("SPARK-18886 - partial offers (isAllFreeResources = false) reset timer 
before " +
+"any resources have been rejected") {
+val clock = new ManualClock()
+val taskScheduler = setupTaskScheduler(clock)
+val advanceAmount = 2000
+
+// by default, new partial resource (isAllFreeResources = false) offers 
reset timer
+// if the resource is accepted
+clock.advance(advanceAmount)
+assert(taskScheduler
+  .resourceOffers(
+IndexedSeq(WorkerOffer("exec1", "host1", 1)),
+isAllFreeResources = false)
+  .flatten.length === 1)
 
 Review comment:
   locality level starts at PROCESS_LOCAL.
   the first resource is process local exec1, host1
   the second resource is only node local exec2, host1
   
   all the test cases I added start at PROCESS_LOCAL (the reason why the 
resource exec1, host1 was offered before submitting tasks), was that the 
confusing part?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] bmarcott commented on a change in pull request #27207: [SPARK-18886][CORE] Make Locality wait time measure resource under utilization due to delay scheduling.

2020-03-08 Thread GitBox
bmarcott commented on a change in pull request #27207: [SPARK-18886][CORE] Make 
Locality wait time measure resource under utilization due to delay scheduling.
URL: https://github.com/apache/spark/pull/27207#discussion_r389453516
 
 

 ##
 File path: 
core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
 ##
 @@ -195,6 +195,196 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext with B
 assert(!failedTaskSet)
   }
 
+  def setupTaskScheduler(clock: ManualClock): TaskSchedulerImpl = {
+val conf = new SparkConf()
+sc = new SparkContext("local", "TaskSchedulerImplSuite", conf)
+val taskScheduler = new TaskSchedulerImpl(sc,
+  sc.conf.get(config.TASK_MAX_FAILURES),
+  clock = clock) {
+  override def createTaskSetManager(taskSet: TaskSet, maxTaskFailures: 
Int): TaskSetManager = {
+new TaskSetManager(this, taskSet, maxTaskFailures, 
blacklistTrackerOpt, clock)
+  }
+  override def shuffleOffers(offers: IndexedSeq[WorkerOffer]): 
IndexedSeq[WorkerOffer] = {
+// Don't shuffle the offers around for this test.  Instead, we'll just 
pass in all
+// the permutations we care about directly.
+offers
+  }
+}
+// Need to initialize a DAGScheduler for the taskScheduler to use for 
callbacks.
+new DAGScheduler(sc, taskScheduler) {
+  override def taskStarted(task: Task[_], taskInfo: TaskInfo): Unit = {}
+
+  override def executorAdded(execId: String, host: String): Unit = {}
+}
+taskScheduler.initialize(new FakeSchedulerBackend)
+val taskSet = FakeTask.createTaskSet(8, 1, 1,
+  Seq(TaskLocation("host1", "exec1")),
+  Seq(TaskLocation("host1", "exec1")),
+  Seq(TaskLocation("host1", "exec1")),
+  Seq(TaskLocation("host1", "exec1")),
+  Seq(TaskLocation("host1", "exec1")),
+  Seq(TaskLocation("host1", "exec1")),
+  Seq(TaskLocation("host1", "exec1")),
+  Seq(TaskLocation("host1", "exec1"))
+)
+taskScheduler.resourceOffers(IndexedSeq(WorkerOffer("exec1", "host1", 1)))
 
 Review comment:
   adding comment.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] bmarcott commented on a change in pull request #27207: [SPARK-18886][CORE] Make Locality wait time measure resource under utilization due to delay scheduling.

2020-03-08 Thread GitBox
bmarcott commented on a change in pull request #27207: [SPARK-18886][CORE] Make 
Locality wait time measure resource under utilization due to delay scheduling.
URL: https://github.com/apache/spark/pull/27207#discussion_r389452680
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
 ##
 @@ -343,8 +364,14 @@ private[spark] class TaskSchedulerImpl(
   if (availableCpus(i) >= CPUS_PER_TASK &&
 resourcesMeetTaskRequirements(availableResources(i))) {
 try {
-  for (task <- taskSet.resourceOffer(execId, host, maxLocality, 
availableResources(i))) {
+  val (taskOption, didReject) =
+taskSet.resourceOffer(execId, host, maxLocality, 
availableResources(i))
+  noDelayScheduleRejects &= !didReject
+  for (task <- taskOption) {
 tasks(i) += task
+val locality = taskSet.taskInfos(task.taskId).taskLocality
+minLaunchedLocality = minLaunchedLocality
 
 Review comment:
    


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] bmarcott commented on a change in pull request #27207: [SPARK-18886][CORE] Make Locality wait time measure resource under utilization due to delay scheduling.

2020-03-08 Thread GitBox
bmarcott commented on a change in pull request #27207: [SPARK-18886][CORE] Make 
Locality wait time measure resource under utilization due to delay scheduling.
URL: https://github.com/apache/spark/pull/27207#discussion_r389452653
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
 ##
 @@ -387,12 +414,27 @@ private[spark] class TaskSchedulerImpl(
 ResourceUtils.resourcesMeetRequirements(resourcesFree, 
resourcesReqsPerTask)
   }
 
+  def minTaskLocality(l1: Option[TaskLocality], l2: Option[TaskLocality]) :
+  Option[TaskLocality] = {
 
 Review comment:
   thanks for pointing out difference. don't wanna break style  


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] bmarcott commented on a change in pull request #27207: [SPARK-18886][CORE] Make Locality wait time measure resource under utilization due to delay scheduling.

2020-03-08 Thread GitBox
bmarcott commented on a change in pull request #27207: [SPARK-18886][CORE] Make 
Locality wait time measure resource under utilization due to delay scheduling.
URL: https://github.com/apache/spark/pull/27207#discussion_r389452597
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
 ##
 @@ -333,8 +351,11 @@ private[spark] class TaskSchedulerImpl(
   availableCpus: Array[Int],
   availableResources: Array[Map[String, Buffer[String]]],
   tasks: IndexedSeq[ArrayBuffer[TaskDescription]],
-  addressesWithDescs: ArrayBuffer[(String, TaskDescription)]) : Boolean = {
+  addressesWithDescs: ArrayBuffer[(String, TaskDescription)])
+: (Boolean, Boolean, Option[TaskLocality]) = {
 
 Review comment:
    


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org