Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/731#discussion_r27089454
  
    --- Diff: core/src/main/scala/org/apache/spark/deploy/master/Master.scala 
---
    @@ -513,89 +513,122 @@ private[spark] class Master(
       }
     
       /**
    -   * Can an app use the given worker? True if the worker has enough memory 
and we haven't already
    -   * launched an executor for the app on it (right now the standalone 
backend doesn't like having
    -   * two executors on the same worker).
    +   * Can an app use the given worker?
        */
    -  def canUse(app: ApplicationInfo, worker: WorkerInfo): Boolean = {
    -    worker.memoryFree >= app.desc.memoryPerSlave && 
!worker.hasExecutor(app)
    +  private def canUse(app: ApplicationInfo, worker: WorkerInfo): Boolean = {
    +    val enoughResources = worker.memoryFree >= 
app.desc.memoryPerExecutorMB && worker.coresFree > 0
    +    val allowToExecute = app.desc.maxCorePerExecutor.isDefined || 
!worker.hasExecutor(app)
    +    allowToExecute && enoughResources
       }
     
       /**
    -   * Schedule the currently available resources among waiting apps. This 
method will be called
    -   * every time a new app joins or resource availability changes.
    +   * This functions starts one or more executors on each worker.
    +   * 
    +   * It traverses the available worker list. In spreadOutApps mode, it 
allocates at most
    +   * spark.executor.cores (multiple executors per worker) or 1 core(s) 
(one executor per worker) 
    +   * for each visit of the worker (can be less than it when the worker 
does not have enough cores 
    +   * or the demand is less than it) and app.desc.memoryPerExecutorMB 
megabytes memory and tracks 
    +   * the resource allocation in a 2d array for each visit; Otherwise, it 
allocates at most 
    +   * spark.executor.cores (multiple executors per worker) or 
worker.freeCores (one executor per
    +   * worker) cores and app.desc.memoryPerExecutorMB megabytes to each 
executor.
        */
    -  private def schedule() {
    -    if (state != RecoveryState.ALIVE) { return }
    -
    -    // First schedule drivers, they take strict precedence over 
applications
    -    // Randomization helps balance drivers
    -    val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state 
== WorkerState.ALIVE))
    -    val numWorkersAlive = shuffledAliveWorkers.size
    -    var curPos = 0
    -
    -    for (driver <- waitingDrivers.toList) { // iterate over a copy of 
waitingDrivers
    -      // We assign workers to each waiting driver in a round-robin 
fashion. For each driver, we
    -      // start from the last worker that was assigned a driver, and 
continue onwards until we have
    -      // explored all alive workers.
    -      var launched = false
    -      var numWorkersVisited = 0
    -      while (numWorkersVisited < numWorkersAlive && !launched) {
    -        val worker = shuffledAliveWorkers(curPos)
    -        numWorkersVisited += 1
    -        if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= 
driver.desc.cores) {
    -          launchDriver(worker, driver)
    -          waitingDrivers -= driver
    -          launched = true
    -        }
    -        curPos = (curPos + 1) % numWorkersAlive
    -      }
    -    }
    -
    -    // Right now this is a very simple FIFO scheduler. We keep trying to 
fit in the first app
    -    // in the queue, then the second app, etc.
    +  private def startExecutorsOnWorker() {
         if (spreadOutApps) {
    -      // Try to spread out each app among all the nodes, until it has all 
its cores
           for (app <- waitingApps if app.coresLeft > 0) {
    -        val usableWorkers = workers.toArray.filter(_.state == 
WorkerState.ALIVE)
    -          .filter(canUse(app, _)).sortBy(_.coresFree).reverse
    +        val memoryPerExecutor = app.desc.memoryPerExecutorMB
    +        val usableWorkers = workers.filter(_.state == WorkerState.ALIVE).
    +          filter(canUse(app, _)).toArray.sortBy(_.memoryFree / 
memoryPerExecutor).reverse
    +        // the maximum number of cores allocated on each executor per 
visit on the worker list
    +        val maxCoreAllocationPerRound = 
app.desc.maxCorePerExecutor.getOrElse(1)
    +        var maxCoresLeft = math.min(app.coresLeft, 
usableWorkers.map(_.coresFree).sum)
             val numUsable = usableWorkers.length
    -        val assigned = new Array[Int](numUsable) // Number of cores to 
give on each node
    -        var toAssign = math.min(app.coresLeft, 
usableWorkers.map(_.coresFree).sum)
    +        val maxExecutorPerWorker = {
    +          if (app.desc.maxCorePerExecutor.isDefined) {
    +            usableWorkers(0).memoryFree / memoryPerExecutor
    +          } else {
    +            1
    +          }
    +        }
    +        // A 2D array that tracks the number of cores used by each 
executor launched on
    +        // each worker. The first index refers to the usable worker, and 
the second index
    +        // refers to the executor launched on that worker.
    +        val assigned = 
Array.fill[Array[Int]](numUsable)(Array.fill[Int](maxExecutorPerWorker)(0))
    +        val executorNumberOnWorker = Array.fill[Int](numUsable)(0)
    +        val assignedSum = Array.fill[Int](numUsable)(0)
             var pos = 0
    -        while (toAssign > 0) {
    -          if (usableWorkers(pos).coresFree - assigned(pos) > 0) {
    -            toAssign -= 1
    -            assigned(pos) += 1
    +        while (maxCoresLeft > 0) {
    +          val memoryDemand = {
    +            if (app.desc.maxCorePerExecutor.isDefined) {
    +              (executorNumberOnWorker(pos) + 1) * memoryPerExecutor
    +            } else {
    +              memoryPerExecutor  
    +            }  
    +          }
    +          if ((usableWorkers(pos).coresFree - assignedSum(pos) > 0) && 
    +            (usableWorkers(pos).memoryFree >= memoryDemand)) {
    +            val coreToAssign = 
math.min(math.min(usableWorkers(pos).coresFree - assignedSum(pos),
    +              maxCoreAllocationPerRound), maxCoresLeft)
    +            assigned(pos)(executorNumberOnWorker(pos)) += coreToAssign
    +            assignedSum(pos) += coreToAssign
    +            maxCoresLeft -= coreToAssign
    +            if (app.desc.maxCorePerExecutor.isDefined) {
    +              // if starting multiple executors on the worker, we move to 
the next executor
    +              executorNumberOnWorker(pos) += 1     
    +            }
               }
               pos = (pos + 1) % numUsable
             }
    -        // Now that we've decided how many cores to give on each node, 
let's actually give them
    -        for (pos <- 0 until numUsable) {
    -          if (assigned(pos) > 0) {
    -            val exec = app.addExecutor(usableWorkers(pos), assigned(pos))
    -            launchExecutor(usableWorkers(pos), exec)
    -            app.state = ApplicationState.RUNNING
    -          }
    +
    +        // Now that we've decided how many executors and the core number 
for each to
    +        // give on each node, let's actually give them
    +        for (pos <- 0 until numUsable; 
    +             execIdx <- 0 until math.max(executorNumberOnWorker(pos), 1)) {
    +          val exec = app.addExecutor(usableWorkers(pos), 
assigned(pos)(execIdx))
    +          launchExecutor(usableWorkers(pos), exec)
    +          app.state = ApplicationState.RUNNING
             }
           }
         } else {
           // Pack each app into as few nodes as possible until we've assigned 
all its cores
           for (worker <- workers if worker.coresFree > 0 && worker.state == 
WorkerState.ALIVE) {
    -        for (app <- waitingApps if app.coresLeft > 0) {
    -          if (canUse(app, worker)) {
    -            val coresToUse = math.min(worker.coresFree, app.coresLeft)
    -            if (coresToUse > 0) {
    -              val exec = app.addExecutor(worker, coresToUse)
    -              launchExecutor(worker, exec)
    -              app.state = ApplicationState.RUNNING
    -            }
    +        for (app <- waitingApps if app.coresLeft > 0 &&
    +          app.desc.memoryPerExecutorMB <= worker.memoryFree) {
    +          val coreNumPerExecutor = 
app.desc.maxCorePerExecutor.getOrElse(worker.coresFree)
    +          var coresLeft = math.min(worker.coresFree, app.coresLeft)
    +          while (coresLeft > 0 && app.desc.memoryPerExecutorMB <= 
worker.memoryFree) {
    +            val coreToAssign = math.min(coreNumPerExecutor, coresLeft)
    +            val exec = app.addExecutor(worker, coreToAssign)
    +            launchExecutor(worker, exec)
    +            coresLeft -= coreToAssign
    +            app.state = ApplicationState.RUNNING
               }
             }
           }
         }
       }
     
    +
    +  /**
    +   * Schedule the currently available resources among waiting apps. This 
method will be called
    +   * every time a new app joins or resource availability changes.
    +   */
    +  def schedule() {
    --- End diff --
    
    `private def scheduler(): Unit = {`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to