Github user andrewor14 commented on a diff in the pull request:
https://github.com/apache/spark/pull/731#discussion_r27916624
--- Diff: core/src/main/scala/org/apache/spark/deploy/master/Master.scala
---
@@ -524,49 +524,26 @@ private[master] class Master(
}
/**
- * Can an app use the given worker? True if the worker has enough memory
and we haven't already
- * launched an executor for the app on it (right now the standalone
backend doesn't like having
- * two executors on the same worker).
+ * Can an app use the given worker?
*/
private def canUse(app: ApplicationInfo, worker: WorkerInfo): Boolean = {
- worker.memoryFree >= app.desc.memoryPerSlave &&
!worker.hasExecutor(app)
+ val enoughResources = worker.memoryFree >=
app.desc.memoryPerExecutorMB && worker.coresFree > 0
+ val allowToExecute = app.desc.maxCorePerExecutor.isDefined ||
!worker.hasExecutor(app)
+ allowToExecute && enoughResources
}
/**
- * Schedule the currently available resources among waiting apps. This
method will be called
- * every time a new app joins or resource availability changes.
+ * The resource allocator spread out each app among all the workers
until it has all its cores in
+ * spreadOut mode otherwise packs each app into as few workers as
possible until it has assigned
+ * all its cores. User can define spark.deploy.maxCoresPerExecutor per
application to
+ * limit the maximum number of cores to allocate to each executor on
each worker; if the parameter
+ * is not defined, then only one executor will be launched on a worker.
*/
- private def schedule() {
- if (state != RecoveryState.ALIVE) { return }
-
- // First schedule drivers, they take strict precedence over
applications
- // Randomization helps balance drivers
- val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state
== WorkerState.ALIVE))
- val numWorkersAlive = shuffledAliveWorkers.size
- var curPos = 0
-
- for (driver <- waitingDrivers.toList) { // iterate over a copy of
waitingDrivers
- // We assign workers to each waiting driver in a round-robin
fashion. For each driver, we
- // start from the last worker that was assigned a driver, and
continue onwards until we have
- // explored all alive workers.
- var launched = false
- var numWorkersVisited = 0
- while (numWorkersVisited < numWorkersAlive && !launched) {
- val worker = shuffledAliveWorkers(curPos)
- numWorkersVisited += 1
- if (worker.memoryFree >= driver.desc.mem && worker.coresFree >=
driver.desc.cores) {
- launchDriver(worker, driver)
- waitingDrivers -= driver
- launched = true
- }
- curPos = (curPos + 1) % numWorkersAlive
- }
- }
-
+ private def startExecutorsOnWorkers() {
--- End diff --
here and everywhere
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]