Github user andrewor14 commented on a diff in the pull request:
https://github.com/apache/spark/pull/7274#discussion_r35140362
--- Diff: core/src/main/scala/org/apache/spark/deploy/master/Master.scala
---
@@ -543,59 +544,108 @@ private[master] class Master(
* multiple executors from the same application may be launched on the
same worker if the worker
* has enough cores and memory. Otherwise, each executor grabs all the
cores available on the
* worker by default, in which case only one executor may be launched on
each worker.
+ *
+ * It is important to allocate coresPerExecutor on each worker at a time
(instead of 1 core
+ * at a time). Consider the following example: cluster has 4 workers
with 16 cores each.
+ * User requests 3 executors (spark.cores.max = 48, spark.executor.cores
= 16). If 1 core is
+ * allocated at a time, 12 cores from each worker would be assigned to
each executor.
+ * Since 12 < 16, no executors would launch [SPARK-8881].
*/
- private def startExecutorsOnWorkers(): Unit = {
- // Right now this is a very simple FIFO scheduler. We keep trying to
fit in the first app
- // in the queue, then the second app, etc.
+ private[master] def scheduleExecutorsOnWorkers(
+ app: ApplicationInfo,
+ usableWorkers: Array[WorkerInfo],
+ spreadOutApps: Boolean): Array[Int] = {
+ // If the number of cores per executor is not specified, then we can
just schedule
+ // 1 core at a time since we expect a single executor to be launched
on each worker
+ val coresPerExecutor = app.desc.coresPerExecutor.getOrElse(1)
+ val memoryPerExecutor = app.desc.memoryPerExecutorMB
+ val numUsable = usableWorkers.length
+ val assignedCores = new Array[Int](numUsable) // Number of cores to
give to each worker
+ val assignedMemory = new Array[Int](numUsable) // Amount of memory to
give to each worker
+ var coresToAssign = math.min(app.coresLeft,
usableWorkers.map(_.coresFree).sum)
+ var pos = 0
+ var lastCoresToAssign = coresToAssign
if (spreadOutApps) {
- // Try to spread out each app among all the workers, until it has
all its cores
- for (app <- waitingApps if app.coresLeft > 0) {
- val usableWorkers = workers.toArray.filter(_.state ==
WorkerState.ALIVE)
- .filter(worker => worker.memoryFree >=
app.desc.memoryPerExecutorMB &&
- worker.coresFree >= app.desc.coresPerExecutor.getOrElse(1))
- .sortBy(_.coresFree).reverse
- val numUsable = usableWorkers.length
- val assigned = new Array[Int](numUsable) // Number of cores to
give on each node
- var toAssign = math.min(app.coresLeft,
usableWorkers.map(_.coresFree).sum)
- var pos = 0
- while (toAssign > 0) {
- if (usableWorkers(pos).coresFree - assigned(pos) > 0) {
- toAssign -= 1
- assigned(pos) += 1
- }
- pos = (pos + 1) % numUsable
+ // Try to spread out executors among workers (sparse scheduling)
+ while (coresToAssign > 0) {
+ if (usableWorkers(pos).coresFree - assignedCores(pos) >=
coresPerExecutor &&
+ usableWorkers(pos).memoryFree - assignedMemory(pos) >=
memoryPerExecutor) {
+ coresToAssign -= coresPerExecutor
+ assignedCores(pos) += coresPerExecutor
+ assignedMemory(pos) += memoryPerExecutor
}
- // Now that we've decided how many cores to give on each node,
let's actually give them
- for (pos <- 0 until numUsable if assigned(pos) > 0) {
- allocateWorkerResourceToExecutors(app, assigned(pos),
usableWorkers(pos))
+ pos = (pos + 1) % numUsable
+ if (pos == 0) {
+ if (lastCoresToAssign == coresToAssign) {
+ return assignedCores
+ }
+ lastCoresToAssign = coresToAssign
}
}
} else {
- // Pack each app into as few workers as possible until we've
assigned all its cores
- for (worker <- workers if worker.coresFree > 0 && worker.state ==
WorkerState.ALIVE) {
- for (app <- waitingApps if app.coresLeft > 0) {
- allocateWorkerResourceToExecutors(app, app.coresLeft, worker)
+ // Pack executors into as few workers as possible (dense scheduling)
+ while (coresToAssign > 0) {
+ while (usableWorkers(pos).coresFree - assignedCores(pos) >=
coresPerExecutor &&
+ usableWorkers(pos).memoryFree - assignedMemory(pos) >=
memoryPerExecutor &&
+ coresToAssign > 0) {
+ coresToAssign -= coresPerExecutor
+ assignedCores(pos) += coresPerExecutor
+ assignedMemory(pos) += memoryPerExecutor
+ }
+ pos = (pos + 1) % numUsable
+ if (pos == 0) {
+ if (lastCoresToAssign == coresToAssign) {
+ return assignedCores
+ }
+ lastCoresToAssign = coresToAssign
--- End diff --
There's a lot of code duplication between the two cases now. I think
there's a nicer way to express this. The termination condition is really (1) if
we have allocated all cores, or (2) if there are no more workers with enough
resources. Something like the following:
```
...
val assignedMemory = ...
var coresToAssign = ...
val freeWorkers = new ArrayBuffer[Int]
def canLaunchExecutor(pos: Int): Boolean = {
usableWorkers(pos).coresFree - assignedCores(pos) >= coresPerExecutor &&
usableWorkers(pos).memoryFree - assignedMemory(pos) >= memoryPerExecutor
&&
coresToAssign > 0
}
freeWorkers ++= (1 to numUsable).filter(canLaunchExecutor)
while (coresToAssign > 0 && freeWorkers.nonEmpty) {
freeWorkers.foreach { pos =>
var keepScheduling = true
while (keepScheduling && canLaunchExecutor(pos)) {
coresToAssign -= coresPerExecutor
assignedCores(pos) += coresPerExecutor
assignedMemory(pos) += memoryPerExecutor
// Spreading out an application means spreading out its executors
across as many
// workers as possible. If we are not spreading out, then we should
keep scheduling
// executors on this worker until we use all of its resources.
keepScheduling = !spreadOutApp
}
}
freeWorkers.retain(canLaunchExecutor)
}
```
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]