Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/7532#discussion_r35987443
  
    --- Diff: core/src/main/scala/org/apache/spark/deploy/master/Master.scala 
---
    @@ -563,32 +570,48 @@ private[master] class Master(
           app: ApplicationInfo,
           usableWorkers: Array[WorkerInfo],
           spreadOutApps: Boolean): Array[Int] = {
    -    // If the number of cores per executor is not specified, then we can 
just schedule
    -    // 1 core at a time since we expect a single executor to be launched 
on each worker
    -    val coresPerExecutor = app.desc.coresPerExecutor.getOrElse(1)
    +    val coresPerExecutor = app.desc.coresPerExecutor
    +    val minCoresPerExecutor = coresPerExecutor.getOrElse(1)
         val memoryPerExecutor = app.desc.memoryPerExecutorMB
         val numUsable = usableWorkers.length
         val assignedCores = new Array[Int](numUsable) // Number of cores to 
give to each worker
    -    val assignedMemory = new Array[Int](numUsable) // Amount of memory to 
give to each worker
    +    val assignedExecutors = new Array[Int](numUsable) // Number of new 
executors on each worker
         var coresToAssign = math.min(app.coresLeft, 
usableWorkers.map(_.coresFree).sum)
    -    var freeWorkers = (0 until numUsable).toIndexedSeq
     
    +    /** Return whether the specified worker can launch an executor for 
this app. */
         def canLaunchExecutor(pos: Int): Boolean = {
    -      usableWorkers(pos).coresFree - assignedCores(pos) >= 
coresPerExecutor &&
    -      usableWorkers(pos).memoryFree - assignedMemory(pos) >= 
memoryPerExecutor
    +      val assignedMemory = assignedExecutors(pos) * memoryPerExecutor
    +      val underLimit =
    +        if (app.oneExecutorPerWorker() && assignedExecutors(pos) == 1) {
    +          // We only have one executor per worker and have already started 
to assign cores to it,
    +          // so assigning more to it does not change the number of 
executors we'll end up with
    +          true
    +        } else {
    +          // Otherwise, we should launch a new executor only if we do not 
exceed the limit
    +          assignedExecutors.sum + app.executors.size < app.executorLimit
    --- End diff --
    
    @vanzin ok I'll separate it out.
    @nishkamravi2 no they cannot be collapsed like that, because in the mode 
where we have one executor per worker, we may have already put 1 core on each 
worker, then 2 cores and so on.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to