[GitHub] spark pull request #18098: [SPARK-16944][Mesos] Improve data locality when l...

2017-10-02 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/18098


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18098: [SPARK-16944][Mesos] Improve data locality when l...

2017-07-24 Thread skonto
Github user skonto commented on a diff in the pull request:

https://github.com/apache/spark/pull/18098#discussion_r129176526
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -72,6 +72,10 @@ package object config {
   private[spark] val DYN_ALLOCATION_MAX_EXECUTORS =
 
ConfigBuilder("spark.dynamicAllocation.maxExecutors").intConf.createWithDefault(Int.MaxValue)
 
+  private[spark] val LOCALITY_WAIT = ConfigBuilder("spark.locality.wait")
+.timeConf(TimeUnit.MILLISECONDS)
+.createWithDefaultString("3s")
+
--- End diff --

My bad was looking to the wrong commit... ignore this... 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18098: [SPARK-16944][Mesos] Improve data locality when l...

2017-07-24 Thread gpang
Github user gpang commented on a diff in the pull request:

https://github.com/apache/spark/pull/18098#discussion_r129095888
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -72,6 +72,10 @@ package object config {
   private[spark] val DYN_ALLOCATION_MAX_EXECUTORS =
 
ConfigBuilder("spark.dynamicAllocation.maxExecutors").intConf.createWithDefault(Int.MaxValue)
 
+  private[spark] val LOCALITY_WAIT = ConfigBuilder("spark.locality.wait")
+.timeConf(TimeUnit.MILLISECONDS)
+.createWithDefaultString("3s")
+
--- End diff --

@skonto Thanks. I don't think I fully understand this comment? I already 
updated `TaskSetManager.scala` to use this configuration variable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18098: [SPARK-16944][Mesos] Improve data locality when l...

2017-07-19 Thread skonto
Github user skonto commented on a diff in the pull request:

https://github.com/apache/spark/pull/18098#discussion_r128243453
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -72,6 +72,10 @@ package object config {
   private[spark] val DYN_ALLOCATION_MAX_EXECUTORS =
 
ConfigBuilder("spark.dynamicAllocation.maxExecutors").intConf.createWithDefault(Int.MaxValue)
 
+  private[spark] val LOCALITY_WAIT = ConfigBuilder("spark.locality.wait")
+.timeConf(TimeUnit.MILLISECONDS)
+.createWithDefaultString("3s")
+
--- End diff --

This default value is also set in the TaskManager let's refactor it and 
re-use the same constant...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18098: [SPARK-16944][Mesos] Improve data locality when l...

2017-07-19 Thread skonto
Github user skonto commented on a diff in the pull request:

https://github.com/apache/spark/pull/18098#discussion_r128235076
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
 ---
@@ -502,6 +526,25 @@ private[spark] class 
MesosCoarseGrainedSchedulerBackend(
 )
   }
 
+  private def satisfiesLocality(offerHostname: String): Boolean = {
+if (!Utils.isDynamicAllocationEnabled(conf) || 
hostToLocalTaskCount.isEmpty) {
+  return true
+}
--- End diff --

Should we move this condition check in canLaunchTask?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18098: [SPARK-16944][Mesos] Improve data locality when l...

2017-06-19 Thread gpang
Github user gpang commented on a diff in the pull request:

https://github.com/apache/spark/pull/18098#discussion_r122734471
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
 ---
@@ -502,6 +526,23 @@ private[spark] class 
MesosCoarseGrainedSchedulerBackend(
 )
   }
 
+  private def satisfiesLocality(offerHostname: String): Boolean = {
+if (!Utils.isDynamicAllocationEnabled(conf) || 
hostToLocalTaskCount.isEmpty) {
+  return true
+}
+
+// Check the locality information
+val currentHosts = 
slaves.values.filter(_.taskIDs.nonEmpty).map(_.hostname).toSet
+val allDesiredHosts = hostToLocalTaskCount.keys.toSet
+val remainingHosts = allDesiredHosts -- currentHosts
--- End diff --

Added comment.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18098: [SPARK-16944][Mesos] Improve data locality when l...

2017-06-19 Thread gpang
Github user gpang commented on a diff in the pull request:

https://github.com/apache/spark/pull/18098#discussion_r122734516
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
 ---
@@ -502,6 +526,23 @@ private[spark] class 
MesosCoarseGrainedSchedulerBackend(
 )
   }
 
+  private def satisfiesLocality(offerHostname: String): Boolean = {
+if (!Utils.isDynamicAllocationEnabled(conf) || 
hostToLocalTaskCount.isEmpty) {
+  return true
+}
+
+// Check the locality information
+val currentHosts = 
slaves.values.filter(_.taskIDs.nonEmpty).map(_.hostname).toSet
+val allDesiredHosts = hostToLocalTaskCount.keys.toSet
+val remainingHosts = allDesiredHosts -- currentHosts
+if (!(remainingHosts contains offerHostname) &&
--- End diff --

Done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18098: [SPARK-16944][Mesos] Improve data locality when l...

2017-06-19 Thread gpang
Github user gpang commented on a diff in the pull request:

https://github.com/apache/spark/pull/18098#discussion_r122734414
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
 ---
@@ -291,6 +300,19 @@ private[spark] class 
MesosCoarseGrainedSchedulerBackend(
 return
   }
 
+  if (numExecutors >= executorLimit) {
+logDebug("Executor limit reached. numExecutors: " + numExecutors() 
+
--- End diff --

Done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18098: [SPARK-16944][Mesos] Improve data locality when l...

2017-06-19 Thread gpang
Github user gpang commented on a diff in the pull request:

https://github.com/apache/spark/pull/18098#discussion_r122734682
  
--- Diff: 
resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
 ---
@@ -586,6 +586,44 @@ class MesosCoarseGrainedSchedulerBackendSuite extends 
SparkFunSuite
 assert(backend.isReady)
   }
 
+  test("supports data locality with dynamic allocation") {
+setBackend(Map(
+  "spark.dynamicAllocation.enabled" -> "true",
+  "spark.dynamicAllocation.testing" -> "true",
+  "spark.locality.wait" -> "2s"))
+
+assert(backend.getExecutorIds().isEmpty)
+
+backend.requestTotalExecutors(2, 2, Map("hosts10" -> 1, "hosts11" -> 
1))
+
+// Offer non-local resources, which should be rejected
+var id = 1
+offerResources(List(Resources(backend.executorMemory(sc), 1)), id)
+verifyTaskNotLaunched(driver, s"o$id")
+id = 2
+offerResources(List(Resources(backend.executorMemory(sc), 1)), id)
+verifyTaskNotLaunched(driver, s"o$id")
+
+// Offer local resource
+id = 10
+offerResources(List(Resources(backend.executorMemory(sc), 1)), id)
+var launchedTasks = verifyTaskLaunched(driver, s"o$id")
+assert(s"s$id" == launchedTasks.head.getSlaveId.getValue)
+registerMockExecutor(launchedTasks.head.getTaskId.getValue, s"s$id", 1)
+assert(backend.getExecutorIds().size == 1)
+
+// Wait longer than spark.locality.wait
+Thread.sleep(3000)
+
+// Offer non-local resource, which should be accepted
+id = 1
+offerResources(List(Resources(backend.executorMemory(sc), 1)), id)
+launchedTasks = verifyTaskLaunched(driver, s"o$id")
+assert(s"s$id" == launchedTasks.head.getSlaveId.getValue)
+registerMockExecutor(launchedTasks.head.getTaskId.getValue, s"s$id", 1)
+assert(backend.getExecutorIds().size == 2)
+  }
--- End diff --

Updated test to increase test coverage.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18098: [SPARK-16944][Mesos] Improve data locality when l...

2017-06-19 Thread gpang
Github user gpang commented on a diff in the pull request:

https://github.com/apache/spark/pull/18098#discussion_r122734548
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
 ---
@@ -502,6 +526,23 @@ private[spark] class 
MesosCoarseGrainedSchedulerBackend(
 )
   }
 
+  private def satisfiesLocality(offerHostname: String): Boolean = {
+if (!Utils.isDynamicAllocationEnabled(conf) || 
hostToLocalTaskCount.isEmpty) {
+  return true
+}
+
+// Check the locality information
+val currentHosts = 
slaves.values.filter(_.taskIDs.nonEmpty).map(_.hostname).toSet
+val allDesiredHosts = hostToLocalTaskCount.keys.toSet
+val remainingHosts = allDesiredHosts -- currentHosts
+if (!(remainingHosts contains offerHostname) &&
+  (System.currentTimeMillis() - localityWaitStartTime <= 
localityWait)) {
+  logDebug("Skipping host and waiting for locality. host: " + 
offerHostname)
+  return false
+}
+true
--- End diff --

Done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18098: [SPARK-16944][Mesos] Improve data locality when l...

2017-06-18 Thread lins05
Github user lins05 commented on a diff in the pull request:

https://github.com/apache/spark/pull/18098#discussion_r122612344
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
 ---
@@ -502,6 +526,23 @@ private[spark] class 
MesosCoarseGrainedSchedulerBackend(
 )
   }
 
+  private def satisfiesLocality(offerHostname: String): Boolean = {
+if (!Utils.isDynamicAllocationEnabled(conf) || 
hostToLocalTaskCount.isEmpty) {
+  return true
+}
+
+// Check the locality information
+val currentHosts = 
slaves.values.filter(_.taskIDs.nonEmpty).map(_.hostname).toSet
+val allDesiredHosts = hostToLocalTaskCount.keys.toSet
+val remainingHosts = allDesiredHosts -- currentHosts
--- End diff --

So we exclude slaves which already have executors launched on them, even if 
they match the locality. I guess it's worth adding a comment here explaining 
the motivation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18098: [SPARK-16944][Mesos] Improve data locality when l...

2017-06-18 Thread lins05
Github user lins05 commented on a diff in the pull request:

https://github.com/apache/spark/pull/18098#discussion_r122611850
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
 ---
@@ -291,6 +300,19 @@ private[spark] class 
MesosCoarseGrainedSchedulerBackend(
 return
   }
 
+  if (numExecutors >= executorLimit) {
+logDebug("Executor limit reached. numExecutors: " + numExecutors() 
+
--- End diff --

nit: `numExecutors()` => `numExecutors` since you're using the latter 
everywhere else


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18098: [SPARK-16944][Mesos] Improve data locality when l...

2017-06-18 Thread lins05
Github user lins05 commented on a diff in the pull request:

https://github.com/apache/spark/pull/18098#discussion_r122615540
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
 ---
@@ -502,6 +526,23 @@ private[spark] class 
MesosCoarseGrainedSchedulerBackend(
 )
   }
 
+  private def satisfiesLocality(offerHostname: String): Boolean = {
+if (!Utils.isDynamicAllocationEnabled(conf) || 
hostToLocalTaskCount.isEmpty) {
+  return true
+}
+
+// Check the locality information
+val currentHosts = 
slaves.values.filter(_.taskIDs.nonEmpty).map(_.hostname).toSet
+val allDesiredHosts = hostToLocalTaskCount.keys.toSet
+val remainingHosts = allDesiredHosts -- currentHosts
+if (!(remainingHosts contains offerHostname) &&
+  (System.currentTimeMillis() - localityWaitStartTime <= 
localityWait)) {
+  logDebug("Skipping host and waiting for locality. host: " + 
offerHostname)
+  return false
+}
+true
--- End diff --

nit: `true` => `return true` since you're using return elsewhere in this 
method


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18098: [SPARK-16944][Mesos] Improve data locality when l...

2017-06-18 Thread lins05
Github user lins05 commented on a diff in the pull request:

https://github.com/apache/spark/pull/18098#discussion_r122612179
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
 ---
@@ -502,6 +526,23 @@ private[spark] class 
MesosCoarseGrainedSchedulerBackend(
 )
   }
 
+  private def satisfiesLocality(offerHostname: String): Boolean = {
+if (!Utils.isDynamicAllocationEnabled(conf) || 
hostToLocalTaskCount.isEmpty) {
+  return true
+}
+
+// Check the locality information
+val currentHosts = 
slaves.values.filter(_.taskIDs.nonEmpty).map(_.hostname).toSet
+val allDesiredHosts = hostToLocalTaskCount.keys.toSet
+val remainingHosts = allDesiredHosts -- currentHosts
+if (!(remainingHosts contains offerHostname) &&
--- End diff --

I think the spark code style is `a.contains(b)` instead of `a contains b`

See the "Infix Methods" section of http://spark.apache.org/contributing.html


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18098: [SPARK-16944][Mesos] Improve data locality when l...

2017-06-18 Thread lins05
Github user lins05 commented on a diff in the pull request:

https://github.com/apache/spark/pull/18098#discussion_r122615907
  
--- Diff: 
resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
 ---
@@ -586,6 +586,44 @@ class MesosCoarseGrainedSchedulerBackendSuite extends 
SparkFunSuite
 assert(backend.isReady)
   }
 
+  test("supports data locality with dynamic allocation") {
+setBackend(Map(
+  "spark.dynamicAllocation.enabled" -> "true",
+  "spark.dynamicAllocation.testing" -> "true",
+  "spark.locality.wait" -> "2s"))
+
+assert(backend.getExecutorIds().isEmpty)
+
+backend.requestTotalExecutors(2, 2, Map("hosts10" -> 1, "hosts11" -> 
1))
+
+// Offer non-local resources, which should be rejected
+var id = 1
+offerResources(List(Resources(backend.executorMemory(sc), 1)), id)
+verifyTaskNotLaunched(driver, s"o$id")
+id = 2
+offerResources(List(Resources(backend.executorMemory(sc), 1)), id)
+verifyTaskNotLaunched(driver, s"o$id")
+
+// Offer local resource
+id = 10
+offerResources(List(Resources(backend.executorMemory(sc), 1)), id)
+var launchedTasks = verifyTaskLaunched(driver, s"o$id")
+assert(s"s$id" == launchedTasks.head.getSlaveId.getValue)
+registerMockExecutor(launchedTasks.head.getTaskId.getValue, s"s$id", 1)
+assert(backend.getExecutorIds().size == 1)
+
+// Wait longer than spark.locality.wait
+Thread.sleep(3000)
+
+// Offer non-local resource, which should be accepted
+id = 1
+offerResources(List(Resources(backend.executorMemory(sc), 1)), id)
+launchedTasks = verifyTaskLaunched(driver, s"o$id")
+assert(s"s$id" == launchedTasks.head.getSlaveId.getValue)
+registerMockExecutor(launchedTasks.head.getTaskId.getValue, s"s$id", 1)
+assert(backend.getExecutorIds().size == 2)
+  }
--- End diff --

I would suggest increasing the test coverage by testing a second round of 
launching by calling `backend.requestTotalExecutors / offerResources/ verify 
..` again. 

We can update the first call to `requestTotalExecutors` to only request one 
executor, and increase to two in the second call. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18098: [SPARK-16944][Mesos] Improve data locality when l...

2017-06-05 Thread gpang
Github user gpang commented on a diff in the pull request:

https://github.com/apache/spark/pull/18098#discussion_r120231509
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
 ---
@@ -502,6 +521,25 @@ private[spark] class 
MesosCoarseGrainedSchedulerBackend(
 )
   }
 
+  private def satisfiesLocality(offerHostname: String): Boolean = {
+if (hostToLocalTaskCount.nonEmpty) {
--- End diff --

@mgummelt Thanks for the thoughtful response. Sorry for the delay.

I am not entirely sure how multi-stage jobs would work, but in the current 
PR, after all the executors are started for a stage, the delay timeout resets 
for the next "stage". So, if Spark needs 3 executors, and 3 executors 
eventually start, the next time Spark needs more executors, the delay timeout 
would start fresh. However, if the next stage is requested before the previous 
stage is fully allocated, then the scenario you described happens. I had made 
the assumption that stages would be fully allocated before requesting 
additional executors for the next stage. Do you have any insights into how 
executors in stages are allocated?

I will also look into per-host delay timeouts.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18098: [SPARK-16944][Mesos] Improve data locality when l...

2017-05-25 Thread mgummelt
Github user mgummelt commented on a diff in the pull request:

https://github.com/apache/spark/pull/18098#discussion_r118615339
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
 ---
@@ -502,6 +521,25 @@ private[spark] class 
MesosCoarseGrainedSchedulerBackend(
 )
   }
 
+  private def satisfiesLocality(offerHostname: String): Boolean = {
+if (hostToLocalTaskCount.nonEmpty) {
--- End diff --

Now that I'm thinking more about this, it's a bit tricker than I originally 
thought.

The problem with the current PR is that, while it works fine if all tasks 
are submitted at once, it won't have any benefit for tasks submitted after the 
scheduler starts, which is what happens in multi-stage jobs.  Since you're 
measuring the delay timeout from the time the scheduling starts, then we'll 
accept all offers that come after that delay, regardless of locality.  So if 
the timeout is 3s, and the scheduler receives a request to schedule a task 5s 
after the job starts, then it will launch an executor to run that task on 
whatever offer happens to arrive first.

What we *really* want is, for each task submitted to the `TaskScheduler`, 
we want to start counting the delay timeout from the moment the task was 
submitted.  However, the `ExecutorAllocationClient` interface doesn't provide 
us with task IDs.  It only gives us hostnames via `hostsToLocalTaskCount`.

I think we can approximate the behavior we want by storing a per-host 
timestamp representing the last time its entry in `hostsToLocalTaskCount` was 
modified.  Then we only accept an offer for a different host if enough time has 
elapsed past this stored time. 

There's code in `YarnAllocator.scala` that does similar things that might 
theoretically provide inspiration, but I just looked and found it to be quite 
unreadable.

We should also only do this if dynamic allocation is enabled, so we should 
probably just condition the entire check on `Utils.isDynamicAllocationEnabled`. 
 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18098: [SPARK-16944][Mesos] Improve data locality when l...

2017-05-25 Thread gpang
Github user gpang commented on a diff in the pull request:

https://github.com/apache/spark/pull/18098#discussion_r118590493
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
 ---
@@ -502,6 +521,25 @@ private[spark] class 
MesosCoarseGrainedSchedulerBackend(
 )
   }
 
+  private def satisfiesLocality(offerHostname: String): Boolean = {
+if (hostToLocalTaskCount.nonEmpty) {
--- End diff --

I see what you mean. I wanted these semantics to only be in-effect if 
location information is available. If there is no location information, I don't 
think there is a reason to wait for a delay.

So, the original semantics I wanted was, "If location information is 
available, launch an executor on a host only when we have a task that wants to 
be on that host, or the configurable delay has elapsed."

What should the semantics be?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18098: [SPARK-16944][Mesos] Improve data locality when l...

2017-05-25 Thread mgummelt
Github user mgummelt commented on a diff in the pull request:

https://github.com/apache/spark/pull/18098#discussion_r118586090
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
 ---
@@ -502,6 +521,25 @@ private[spark] class 
MesosCoarseGrainedSchedulerBackend(
 )
   }
 
+  private def satisfiesLocality(offerHostname: String): Boolean = {
+if (hostToLocalTaskCount.nonEmpty) {
--- End diff --

You've agreed that the semantics should be "Launch an executor on a host 
only when we have a task that wants to be on that host, or the configurable 
delay has elapsed"

By launching tasks on any arbitrary host when no locality info is 
available, you're violating those semantics, because even before the delay has 
elapsed, the scheduler will launch a task on an agent that no task wants to be 
on.  Does that make sense?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18098: [SPARK-16944][Mesos] Improve data locality when l...

2017-05-25 Thread gpang
Github user gpang commented on a diff in the pull request:

https://github.com/apache/spark/pull/18098#discussion_r118581450
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
 ---
@@ -393,7 +410,30 @@ private[spark] class 
MesosCoarseGrainedSchedulerBackend(
 val offerId = offer.getId.getValue
 val resources = remainingResources(offerId)
 
-if (canLaunchTask(slaveId, resources)) {
+var createTask = canLaunchTask(slaveId, resources)
+if (hostToLocalTaskCount.nonEmpty) {
+  // Locality information is available
+  if (numExecutors() < executorLimit) {
--- End diff --

Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18098: [SPARK-16944][Mesos] Improve data locality when l...

2017-05-25 Thread gpang
Github user gpang commented on a diff in the pull request:

https://github.com/apache/spark/pull/18098#discussion_r118581400
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
 ---
@@ -393,7 +409,30 @@ private[spark] class 
MesosCoarseGrainedSchedulerBackend(
 val offerId = offer.getId.getValue
 val resources = remainingResources(offerId)
 
-if (canLaunchTask(slaveId, resources)) {
+var createTask = canLaunchTask(slaveId, resources)
+if (hostToLocalTaskCount.nonEmpty) {
--- End diff --

I'm assuming that if `hostToLocalTaskCount` is not empty, there is locality 
info. I did another check for the executors to launch.

I did move the logic out to `satisfiesLocality()` and removed redundant 
executorLimit checks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18098: [SPARK-16944][Mesos] Improve data locality when l...

2017-05-25 Thread mgummelt
Github user mgummelt commented on a diff in the pull request:

https://github.com/apache/spark/pull/18098#discussion_r118558695
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
 ---
@@ -393,7 +410,30 @@ private[spark] class 
MesosCoarseGrainedSchedulerBackend(
 val offerId = offer.getId.getValue
 val resources = remainingResources(offerId)
 
-if (canLaunchTask(slaveId, resources)) {
+var createTask = canLaunchTask(slaveId, resources)
+if (hostToLocalTaskCount.nonEmpty) {
+  // Locality information is available
+  if (numExecutors() < executorLimit) {
--- End diff --

s/numExecutors()/numExecutors


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18098: [SPARK-16944][Mesos] Improve data locality when l...

2017-05-25 Thread mgummelt
Github user mgummelt commented on a diff in the pull request:

https://github.com/apache/spark/pull/18098#discussion_r118550928
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
 ---
@@ -393,7 +409,30 @@ private[spark] class 
MesosCoarseGrainedSchedulerBackend(
 val offerId = offer.getId.getValue
 val resources = remainingResources(offerId)
 
-if (canLaunchTask(slaveId, resources)) {
+var createTask = canLaunchTask(slaveId, resources)
+if (hostToLocalTaskCount.nonEmpty) {
--- End diff --

It sounds like you're assuming `hostToLocalTaskCount.empty -> there are no 
executors to launch`.  That may be true, but regardless, if that's the case, 
then `createTask` is already false, so this check is redundant, right?

To make this more clear, please factor out this code into some function 
like `satisfiesLocality`, and instead of calling it here, for consistency, 
please add it as one of the constraints in `canLaunchTask`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18098: [SPARK-16944][Mesos] Improve data locality when l...

2017-05-25 Thread gpang
Github user gpang commented on a diff in the pull request:

https://github.com/apache/spark/pull/18098#discussion_r118529407
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
 ---
@@ -393,7 +409,30 @@ private[spark] class 
MesosCoarseGrainedSchedulerBackend(
 val offerId = offer.getId.getValue
 val resources = remainingResources(offerId)
 
-if (canLaunchTask(slaveId, resources)) {
+var createTask = canLaunchTask(slaveId, resources)
+if (hostToLocalTaskCount.nonEmpty) {
--- End diff --

Yes, that is the semantics I was going for.

For the scenario you are mentioning, I'm not sure how that would happen. If 
there are no executors required, then offers will be declined and this logic 
will not be exercised. When would an executor be launched on any host?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18098: [SPARK-16944][Mesos] Improve data locality when l...

2017-05-25 Thread gpang
Github user gpang commented on a diff in the pull request:

https://github.com/apache/spark/pull/18098#discussion_r118528485
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
 ---
@@ -94,6 +94,14 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
   private var totalCoresAcquired = 0
   private var totalGpusAcquired = 0
 
+  // The amount of time to wait for locality scheduling
+  private val localityWait = conf.getTimeAsMs("spark.locality.wait", "3s")
--- End diff --

I added `spark.locality.wait` as a `ConfigEntry`. Please take a look to see 
if I did that correctly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18098: [SPARK-16944][Mesos] Improve data locality when l...

2017-05-24 Thread mgummelt
Github user mgummelt commented on a diff in the pull request:

https://github.com/apache/spark/pull/18098#discussion_r118392464
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
 ---
@@ -393,7 +409,30 @@ private[spark] class 
MesosCoarseGrainedSchedulerBackend(
 val offerId = offer.getId.getValue
 val resources = remainingResources(offerId)
 
-if (canLaunchTask(slaveId, resources)) {
+var createTask = canLaunchTask(slaveId, resources)
+if (hostToLocalTaskCount.nonEmpty) {
--- End diff --

The semantics you're going for is "Launch an executor on a host only when 
we have a task that wants to be on that host, or the configurable delay has 
elapsed", right?  By launching an executor on any host when we don't yet have 
any tasks to schedule, you're violating those semantics, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18098: [SPARK-16944][Mesos] Improve data locality when l...

2017-05-24 Thread mgummelt
Github user mgummelt commented on a diff in the pull request:

https://github.com/apache/spark/pull/18098#discussion_r118381412
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
 ---
@@ -94,6 +94,14 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
   private var totalCoresAcquired = 0
   private var totalGpusAcquired = 0
 
+  // The amount of time to wait for locality scheduling
+  private val localityWait = conf.getTimeAsMs("spark.locality.wait", "3s")
--- End diff --

Can you instead add spark.locality.wait to 
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/internal/config/package.scala
 and use the `ConfigEntry` for it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18098: [SPARK-16944][Mesos] Improve data locality when l...

2017-05-24 Thread gpang
GitHub user gpang opened a pull request:

https://github.com/apache/spark/pull/18098

[SPARK-16944][Mesos] Improve data locality when launching new executors 
when dynamic allocation is enabled

## What changes were proposed in this pull request?

Improve the Spark-Mesos coarse-grained scheduler to consider the preferred 
locations when dynamic allocation is enabled.

## How was this patch tested?

Added a unittest, and performed manual testing on AWS.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/gpang/spark mesos_data_locality

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/18098.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #18098


commit 5309fa99df4431ab8b60ddb3681d19aa36331dc8
Author: Gene Pang 
Date:   2017-05-19T21:04:18Z

Support data locality for executors in Mesos scheduler




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org