Github user vanzin commented on a diff in the pull request:
https://github.com/apache/spark/pull/7532#discussion_r35915249
--- Diff: core/src/main/scala/org/apache/spark/deploy/master/Master.scala
---
@@ -563,32 +570,48 @@ private[master] class Master(
app: ApplicationInfo,
usableWorkers: Array[WorkerInfo],
spreadOutApps: Boolean): Array[Int] = {
- // If the number of cores per executor is not specified, then we can
just schedule
- // 1 core at a time since we expect a single executor to be launched
on each worker
- val coresPerExecutor = app.desc.coresPerExecutor.getOrElse(1)
+ val coresPerExecutor = app.desc.coresPerExecutor
+ val minCoresPerExecutor = coresPerExecutor.getOrElse(1)
val memoryPerExecutor = app.desc.memoryPerExecutorMB
val numUsable = usableWorkers.length
val assignedCores = new Array[Int](numUsable) // Number of cores to
give to each worker
- val assignedMemory = new Array[Int](numUsable) // Amount of memory to
give to each worker
+ val assignedExecutors = new Array[Int](numUsable) // Number of new
executors on each worker
var coresToAssign = math.min(app.coresLeft,
usableWorkers.map(_.coresFree).sum)
- var freeWorkers = (0 until numUsable).toIndexedSeq
+ /** Return whether the specified worker can launch an executor for
this app. */
def canLaunchExecutor(pos: Int): Boolean = {
- usableWorkers(pos).coresFree - assignedCores(pos) >=
coresPerExecutor &&
- usableWorkers(pos).memoryFree - assignedMemory(pos) >=
memoryPerExecutor
+ val assignedMemory = assignedExecutors(pos) * memoryPerExecutor
+ val underLimit =
+ if (app.oneExecutorPerWorker() && assignedExecutors(pos) == 1) {
+ // We only have one executor per worker and have already started
to assign cores to it,
+ // so assigning more to it does not change the number of
executors we'll end up with
+ true
+ } else {
+ // Otherwise, we should launch a new executor only if we do not
exceed the limit
+ assignedExecutors.sum + app.executors.size < app.executorLimit
--- End diff --
It feels a little weird to insert app limit logic here, since the function
seems to be about whether the worker can take a new executor.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]