[GitHub] spark pull request #18098: [SPARK-16944][Mesos] Improve data locality when l...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/18098 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18098: [SPARK-16944][Mesos] Improve data locality when l...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/18098#discussion_r129176526 --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala --- @@ -72,6 +72,10 @@ package object config { private[spark] val DYN_ALLOCATION_MAX_EXECUTORS = ConfigBuilder("spark.dynamicAllocation.maxExecutors").intConf.createWithDefault(Int.MaxValue) + private[spark] val LOCALITY_WAIT = ConfigBuilder("spark.locality.wait") +.timeConf(TimeUnit.MILLISECONDS) +.createWithDefaultString("3s") + --- End diff -- My bad was looking to the wrong commit... ignore this... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18098: [SPARK-16944][Mesos] Improve data locality when l...
Github user gpang commented on a diff in the pull request: https://github.com/apache/spark/pull/18098#discussion_r129095888 --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala --- @@ -72,6 +72,10 @@ package object config { private[spark] val DYN_ALLOCATION_MAX_EXECUTORS = ConfigBuilder("spark.dynamicAllocation.maxExecutors").intConf.createWithDefault(Int.MaxValue) + private[spark] val LOCALITY_WAIT = ConfigBuilder("spark.locality.wait") +.timeConf(TimeUnit.MILLISECONDS) +.createWithDefaultString("3s") + --- End diff -- @skonto Thanks. I don't think I fully understand this comment? I already updated `TaskSetManager.scala` to use this configuration variable. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18098: [SPARK-16944][Mesos] Improve data locality when l...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/18098#discussion_r128243453 --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala --- @@ -72,6 +72,10 @@ package object config { private[spark] val DYN_ALLOCATION_MAX_EXECUTORS = ConfigBuilder("spark.dynamicAllocation.maxExecutors").intConf.createWithDefault(Int.MaxValue) + private[spark] val LOCALITY_WAIT = ConfigBuilder("spark.locality.wait") +.timeConf(TimeUnit.MILLISECONDS) +.createWithDefaultString("3s") + --- End diff -- This default value is also set in the TaskManager let's refactor it and re-use the same constant... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18098: [SPARK-16944][Mesos] Improve data locality when l...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/18098#discussion_r128235076 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala --- @@ -502,6 +526,25 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( ) } + private def satisfiesLocality(offerHostname: String): Boolean = { +if (!Utils.isDynamicAllocationEnabled(conf) || hostToLocalTaskCount.isEmpty) { + return true +} --- End diff -- Should we move this condition check in canLaunchTask? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18098: [SPARK-16944][Mesos] Improve data locality when l...
Github user gpang commented on a diff in the pull request: https://github.com/apache/spark/pull/18098#discussion_r122734471 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala --- @@ -502,6 +526,23 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( ) } + private def satisfiesLocality(offerHostname: String): Boolean = { +if (!Utils.isDynamicAllocationEnabled(conf) || hostToLocalTaskCount.isEmpty) { + return true +} + +// Check the locality information +val currentHosts = slaves.values.filter(_.taskIDs.nonEmpty).map(_.hostname).toSet +val allDesiredHosts = hostToLocalTaskCount.keys.toSet +val remainingHosts = allDesiredHosts -- currentHosts --- End diff -- Added comment. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18098: [SPARK-16944][Mesos] Improve data locality when l...
Github user gpang commented on a diff in the pull request: https://github.com/apache/spark/pull/18098#discussion_r122734516 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala --- @@ -502,6 +526,23 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( ) } + private def satisfiesLocality(offerHostname: String): Boolean = { +if (!Utils.isDynamicAllocationEnabled(conf) || hostToLocalTaskCount.isEmpty) { + return true +} + +// Check the locality information +val currentHosts = slaves.values.filter(_.taskIDs.nonEmpty).map(_.hostname).toSet +val allDesiredHosts = hostToLocalTaskCount.keys.toSet +val remainingHosts = allDesiredHosts -- currentHosts +if (!(remainingHosts contains offerHostname) && --- End diff -- Done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18098: [SPARK-16944][Mesos] Improve data locality when l...
Github user gpang commented on a diff in the pull request: https://github.com/apache/spark/pull/18098#discussion_r122734414 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala --- @@ -291,6 +300,19 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( return } + if (numExecutors >= executorLimit) { +logDebug("Executor limit reached. numExecutors: " + numExecutors() + --- End diff -- Done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18098: [SPARK-16944][Mesos] Improve data locality when l...
Github user gpang commented on a diff in the pull request: https://github.com/apache/spark/pull/18098#discussion_r122734682 --- Diff: resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala --- @@ -586,6 +586,44 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite assert(backend.isReady) } + test("supports data locality with dynamic allocation") { +setBackend(Map( + "spark.dynamicAllocation.enabled" -> "true", + "spark.dynamicAllocation.testing" -> "true", + "spark.locality.wait" -> "2s")) + +assert(backend.getExecutorIds().isEmpty) + +backend.requestTotalExecutors(2, 2, Map("hosts10" -> 1, "hosts11" -> 1)) + +// Offer non-local resources, which should be rejected +var id = 1 +offerResources(List(Resources(backend.executorMemory(sc), 1)), id) +verifyTaskNotLaunched(driver, s"o$id") +id = 2 +offerResources(List(Resources(backend.executorMemory(sc), 1)), id) +verifyTaskNotLaunched(driver, s"o$id") + +// Offer local resource +id = 10 +offerResources(List(Resources(backend.executorMemory(sc), 1)), id) +var launchedTasks = verifyTaskLaunched(driver, s"o$id") +assert(s"s$id" == launchedTasks.head.getSlaveId.getValue) +registerMockExecutor(launchedTasks.head.getTaskId.getValue, s"s$id", 1) +assert(backend.getExecutorIds().size == 1) + +// Wait longer than spark.locality.wait +Thread.sleep(3000) + +// Offer non-local resource, which should be accepted +id = 1 +offerResources(List(Resources(backend.executorMemory(sc), 1)), id) +launchedTasks = verifyTaskLaunched(driver, s"o$id") +assert(s"s$id" == launchedTasks.head.getSlaveId.getValue) +registerMockExecutor(launchedTasks.head.getTaskId.getValue, s"s$id", 1) +assert(backend.getExecutorIds().size == 2) + } --- End diff -- Updated test to increase test coverage. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18098: [SPARK-16944][Mesos] Improve data locality when l...
Github user gpang commented on a diff in the pull request: https://github.com/apache/spark/pull/18098#discussion_r122734548 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala --- @@ -502,6 +526,23 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( ) } + private def satisfiesLocality(offerHostname: String): Boolean = { +if (!Utils.isDynamicAllocationEnabled(conf) || hostToLocalTaskCount.isEmpty) { + return true +} + +// Check the locality information +val currentHosts = slaves.values.filter(_.taskIDs.nonEmpty).map(_.hostname).toSet +val allDesiredHosts = hostToLocalTaskCount.keys.toSet +val remainingHosts = allDesiredHosts -- currentHosts +if (!(remainingHosts contains offerHostname) && + (System.currentTimeMillis() - localityWaitStartTime <= localityWait)) { + logDebug("Skipping host and waiting for locality. host: " + offerHostname) + return false +} +true --- End diff -- Done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18098: [SPARK-16944][Mesos] Improve data locality when l...
Github user lins05 commented on a diff in the pull request: https://github.com/apache/spark/pull/18098#discussion_r122612344 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala --- @@ -502,6 +526,23 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( ) } + private def satisfiesLocality(offerHostname: String): Boolean = { +if (!Utils.isDynamicAllocationEnabled(conf) || hostToLocalTaskCount.isEmpty) { + return true +} + +// Check the locality information +val currentHosts = slaves.values.filter(_.taskIDs.nonEmpty).map(_.hostname).toSet +val allDesiredHosts = hostToLocalTaskCount.keys.toSet +val remainingHosts = allDesiredHosts -- currentHosts --- End diff -- So we exclude slaves which already have executors launched on them, even if they match the locality. I guess it's worth adding a comment here explaining the motivation. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18098: [SPARK-16944][Mesos] Improve data locality when l...
Github user lins05 commented on a diff in the pull request: https://github.com/apache/spark/pull/18098#discussion_r122611850 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala --- @@ -291,6 +300,19 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( return } + if (numExecutors >= executorLimit) { +logDebug("Executor limit reached. numExecutors: " + numExecutors() + --- End diff -- nit: `numExecutors()` => `numExecutors` since you're using the latter everywhere else --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18098: [SPARK-16944][Mesos] Improve data locality when l...
Github user lins05 commented on a diff in the pull request: https://github.com/apache/spark/pull/18098#discussion_r122615540 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala --- @@ -502,6 +526,23 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( ) } + private def satisfiesLocality(offerHostname: String): Boolean = { +if (!Utils.isDynamicAllocationEnabled(conf) || hostToLocalTaskCount.isEmpty) { + return true +} + +// Check the locality information +val currentHosts = slaves.values.filter(_.taskIDs.nonEmpty).map(_.hostname).toSet +val allDesiredHosts = hostToLocalTaskCount.keys.toSet +val remainingHosts = allDesiredHosts -- currentHosts +if (!(remainingHosts contains offerHostname) && + (System.currentTimeMillis() - localityWaitStartTime <= localityWait)) { + logDebug("Skipping host and waiting for locality. host: " + offerHostname) + return false +} +true --- End diff -- nit: `true` => `return true` since you're using return elsewhere in this method --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18098: [SPARK-16944][Mesos] Improve data locality when l...
Github user lins05 commented on a diff in the pull request: https://github.com/apache/spark/pull/18098#discussion_r122612179 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala --- @@ -502,6 +526,23 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( ) } + private def satisfiesLocality(offerHostname: String): Boolean = { +if (!Utils.isDynamicAllocationEnabled(conf) || hostToLocalTaskCount.isEmpty) { + return true +} + +// Check the locality information +val currentHosts = slaves.values.filter(_.taskIDs.nonEmpty).map(_.hostname).toSet +val allDesiredHosts = hostToLocalTaskCount.keys.toSet +val remainingHosts = allDesiredHosts -- currentHosts +if (!(remainingHosts contains offerHostname) && --- End diff -- I think the spark code style is `a.contains(b)` instead of `a contains b` See the "Infix Methods" section of http://spark.apache.org/contributing.html --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18098: [SPARK-16944][Mesos] Improve data locality when l...
Github user lins05 commented on a diff in the pull request: https://github.com/apache/spark/pull/18098#discussion_r122615907 --- Diff: resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala --- @@ -586,6 +586,44 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite assert(backend.isReady) } + test("supports data locality with dynamic allocation") { +setBackend(Map( + "spark.dynamicAllocation.enabled" -> "true", + "spark.dynamicAllocation.testing" -> "true", + "spark.locality.wait" -> "2s")) + +assert(backend.getExecutorIds().isEmpty) + +backend.requestTotalExecutors(2, 2, Map("hosts10" -> 1, "hosts11" -> 1)) + +// Offer non-local resources, which should be rejected +var id = 1 +offerResources(List(Resources(backend.executorMemory(sc), 1)), id) +verifyTaskNotLaunched(driver, s"o$id") +id = 2 +offerResources(List(Resources(backend.executorMemory(sc), 1)), id) +verifyTaskNotLaunched(driver, s"o$id") + +// Offer local resource +id = 10 +offerResources(List(Resources(backend.executorMemory(sc), 1)), id) +var launchedTasks = verifyTaskLaunched(driver, s"o$id") +assert(s"s$id" == launchedTasks.head.getSlaveId.getValue) +registerMockExecutor(launchedTasks.head.getTaskId.getValue, s"s$id", 1) +assert(backend.getExecutorIds().size == 1) + +// Wait longer than spark.locality.wait +Thread.sleep(3000) + +// Offer non-local resource, which should be accepted +id = 1 +offerResources(List(Resources(backend.executorMemory(sc), 1)), id) +launchedTasks = verifyTaskLaunched(driver, s"o$id") +assert(s"s$id" == launchedTasks.head.getSlaveId.getValue) +registerMockExecutor(launchedTasks.head.getTaskId.getValue, s"s$id", 1) +assert(backend.getExecutorIds().size == 2) + } --- End diff -- I would suggest increasing the test coverage by testing a second round of launching by calling `backend.requestTotalExecutors / offerResources/ verify ..` again. We can update the first call to `requestTotalExecutors` to only request one executor, and increase to two in the second call. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18098: [SPARK-16944][Mesos] Improve data locality when l...
Github user gpang commented on a diff in the pull request: https://github.com/apache/spark/pull/18098#discussion_r120231509 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala --- @@ -502,6 +521,25 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( ) } + private def satisfiesLocality(offerHostname: String): Boolean = { +if (hostToLocalTaskCount.nonEmpty) { --- End diff -- @mgummelt Thanks for the thoughtful response. Sorry for the delay. I am not entirely sure how multi-stage jobs would work, but in the current PR, after all the executors are started for a stage, the delay timeout resets for the next "stage". So, if Spark needs 3 executors, and 3 executors eventually start, the next time Spark needs more executors, the delay timeout would start fresh. However, if the next stage is requested before the previous stage is fully allocated, then the scenario you described happens. I had made the assumption that stages would be fully allocated before requesting additional executors for the next stage. Do you have any insights into how executors in stages are allocated? I will also look into per-host delay timeouts. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18098: [SPARK-16944][Mesos] Improve data locality when l...
Github user mgummelt commented on a diff in the pull request: https://github.com/apache/spark/pull/18098#discussion_r118615339 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala --- @@ -502,6 +521,25 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( ) } + private def satisfiesLocality(offerHostname: String): Boolean = { +if (hostToLocalTaskCount.nonEmpty) { --- End diff -- Now that I'm thinking more about this, it's a bit tricker than I originally thought. The problem with the current PR is that, while it works fine if all tasks are submitted at once, it won't have any benefit for tasks submitted after the scheduler starts, which is what happens in multi-stage jobs. Since you're measuring the delay timeout from the time the scheduling starts, then we'll accept all offers that come after that delay, regardless of locality. So if the timeout is 3s, and the scheduler receives a request to schedule a task 5s after the job starts, then it will launch an executor to run that task on whatever offer happens to arrive first. What we *really* want is, for each task submitted to the `TaskScheduler`, we want to start counting the delay timeout from the moment the task was submitted. However, the `ExecutorAllocationClient` interface doesn't provide us with task IDs. It only gives us hostnames via `hostsToLocalTaskCount`. I think we can approximate the behavior we want by storing a per-host timestamp representing the last time its entry in `hostsToLocalTaskCount` was modified. Then we only accept an offer for a different host if enough time has elapsed past this stored time. There's code in `YarnAllocator.scala` that does similar things that might theoretically provide inspiration, but I just looked and found it to be quite unreadable. We should also only do this if dynamic allocation is enabled, so we should probably just condition the entire check on `Utils.isDynamicAllocationEnabled`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18098: [SPARK-16944][Mesos] Improve data locality when l...
Github user gpang commented on a diff in the pull request: https://github.com/apache/spark/pull/18098#discussion_r118590493 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala --- @@ -502,6 +521,25 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( ) } + private def satisfiesLocality(offerHostname: String): Boolean = { +if (hostToLocalTaskCount.nonEmpty) { --- End diff -- I see what you mean. I wanted these semantics to only be in-effect if location information is available. If there is no location information, I don't think there is a reason to wait for a delay. So, the original semantics I wanted was, "If location information is available, launch an executor on a host only when we have a task that wants to be on that host, or the configurable delay has elapsed." What should the semantics be? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18098: [SPARK-16944][Mesos] Improve data locality when l...
Github user mgummelt commented on a diff in the pull request: https://github.com/apache/spark/pull/18098#discussion_r118586090 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala --- @@ -502,6 +521,25 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( ) } + private def satisfiesLocality(offerHostname: String): Boolean = { +if (hostToLocalTaskCount.nonEmpty) { --- End diff -- You've agreed that the semantics should be "Launch an executor on a host only when we have a task that wants to be on that host, or the configurable delay has elapsed" By launching tasks on any arbitrary host when no locality info is available, you're violating those semantics, because even before the delay has elapsed, the scheduler will launch a task on an agent that no task wants to be on. Does that make sense? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18098: [SPARK-16944][Mesos] Improve data locality when l...
Github user gpang commented on a diff in the pull request: https://github.com/apache/spark/pull/18098#discussion_r118581450 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala --- @@ -393,7 +410,30 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( val offerId = offer.getId.getValue val resources = remainingResources(offerId) -if (canLaunchTask(slaveId, resources)) { +var createTask = canLaunchTask(slaveId, resources) +if (hostToLocalTaskCount.nonEmpty) { + // Locality information is available + if (numExecutors() < executorLimit) { --- End diff -- Done. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18098: [SPARK-16944][Mesos] Improve data locality when l...
Github user gpang commented on a diff in the pull request: https://github.com/apache/spark/pull/18098#discussion_r118581400 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala --- @@ -393,7 +409,30 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( val offerId = offer.getId.getValue val resources = remainingResources(offerId) -if (canLaunchTask(slaveId, resources)) { +var createTask = canLaunchTask(slaveId, resources) +if (hostToLocalTaskCount.nonEmpty) { --- End diff -- I'm assuming that if `hostToLocalTaskCount` is not empty, there is locality info. I did another check for the executors to launch. I did move the logic out to `satisfiesLocality()` and removed redundant executorLimit checks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18098: [SPARK-16944][Mesos] Improve data locality when l...
Github user mgummelt commented on a diff in the pull request: https://github.com/apache/spark/pull/18098#discussion_r118558695 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala --- @@ -393,7 +410,30 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( val offerId = offer.getId.getValue val resources = remainingResources(offerId) -if (canLaunchTask(slaveId, resources)) { +var createTask = canLaunchTask(slaveId, resources) +if (hostToLocalTaskCount.nonEmpty) { + // Locality information is available + if (numExecutors() < executorLimit) { --- End diff -- s/numExecutors()/numExecutors --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18098: [SPARK-16944][Mesos] Improve data locality when l...
Github user mgummelt commented on a diff in the pull request: https://github.com/apache/spark/pull/18098#discussion_r118550928 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala --- @@ -393,7 +409,30 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( val offerId = offer.getId.getValue val resources = remainingResources(offerId) -if (canLaunchTask(slaveId, resources)) { +var createTask = canLaunchTask(slaveId, resources) +if (hostToLocalTaskCount.nonEmpty) { --- End diff -- It sounds like you're assuming `hostToLocalTaskCount.empty -> there are no executors to launch`. That may be true, but regardless, if that's the case, then `createTask` is already false, so this check is redundant, right? To make this more clear, please factor out this code into some function like `satisfiesLocality`, and instead of calling it here, for consistency, please add it as one of the constraints in `canLaunchTask`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18098: [SPARK-16944][Mesos] Improve data locality when l...
Github user gpang commented on a diff in the pull request: https://github.com/apache/spark/pull/18098#discussion_r118529407 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala --- @@ -393,7 +409,30 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( val offerId = offer.getId.getValue val resources = remainingResources(offerId) -if (canLaunchTask(slaveId, resources)) { +var createTask = canLaunchTask(slaveId, resources) +if (hostToLocalTaskCount.nonEmpty) { --- End diff -- Yes, that is the semantics I was going for. For the scenario you are mentioning, I'm not sure how that would happen. If there are no executors required, then offers will be declined and this logic will not be exercised. When would an executor be launched on any host? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18098: [SPARK-16944][Mesos] Improve data locality when l...
Github user gpang commented on a diff in the pull request: https://github.com/apache/spark/pull/18098#discussion_r118528485 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala --- @@ -94,6 +94,14 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( private var totalCoresAcquired = 0 private var totalGpusAcquired = 0 + // The amount of time to wait for locality scheduling + private val localityWait = conf.getTimeAsMs("spark.locality.wait", "3s") --- End diff -- I added `spark.locality.wait` as a `ConfigEntry`. Please take a look to see if I did that correctly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18098: [SPARK-16944][Mesos] Improve data locality when l...
Github user mgummelt commented on a diff in the pull request: https://github.com/apache/spark/pull/18098#discussion_r118392464 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala --- @@ -393,7 +409,30 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( val offerId = offer.getId.getValue val resources = remainingResources(offerId) -if (canLaunchTask(slaveId, resources)) { +var createTask = canLaunchTask(slaveId, resources) +if (hostToLocalTaskCount.nonEmpty) { --- End diff -- The semantics you're going for is "Launch an executor on a host only when we have a task that wants to be on that host, or the configurable delay has elapsed", right? By launching an executor on any host when we don't yet have any tasks to schedule, you're violating those semantics, right? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18098: [SPARK-16944][Mesos] Improve data locality when l...
Github user mgummelt commented on a diff in the pull request: https://github.com/apache/spark/pull/18098#discussion_r118381412 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala --- @@ -94,6 +94,14 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( private var totalCoresAcquired = 0 private var totalGpusAcquired = 0 + // The amount of time to wait for locality scheduling + private val localityWait = conf.getTimeAsMs("spark.locality.wait", "3s") --- End diff -- Can you instead add spark.locality.wait to https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/internal/config/package.scala and use the `ConfigEntry` for it? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18098: [SPARK-16944][Mesos] Improve data locality when l...
GitHub user gpang opened a pull request: https://github.com/apache/spark/pull/18098 [SPARK-16944][Mesos] Improve data locality when launching new executors when dynamic allocation is enabled ## What changes were proposed in this pull request? Improve the Spark-Mesos coarse-grained scheduler to consider the preferred locations when dynamic allocation is enabled. ## How was this patch tested? Added a unittest, and performed manual testing on AWS. You can merge this pull request into a Git repository by running: $ git pull https://github.com/gpang/spark mesos_data_locality Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/18098.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #18098 commit 5309fa99df4431ab8b60ddb3681d19aa36331dc8 Author: Gene PangDate: 2017-05-19T21:04:18Z Support data locality for executors in Mesos scheduler --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org