Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/731#discussion_r25116480
  
    --- Diff: core/src/main/scala/org/apache/spark/deploy/master/Master.scala 
---
    @@ -581,6 +558,104 @@ private[spark] class Master(
         }
       }
     
    +  /**
    +   * This functions starts multiple executors on each worker.
    +   * It first calculates the maximum number of executors we can allocate 
to the application in this 
    +   * scheduling moment according to the free memory space on each worker, 
then tries to allocate 
    +   * executors on each worker according to the user-specified 
spark.executor.maxCoreNumPerExecutor.
    +   * 
    +   * It traverses the available worker list. In spreadOutApps mode, it 
allocates at most
    +   * spark.executor.maxCoreNumPerExecutor cores (can be less than it when 
the worker does not have 
    +   * enough cores or the demand is less than it) and 
app.desc.memoryPerExecutorMB megabytes memory 
    +   * and tracks the resource allocation in a 2d array for each visit; 
Otherwise, it allocates at
    +   * most spark.executor.maxCoreNumPerExecutor cores and 
app.desc.memoryPerExecutorMB megabytes 
    +   * to each executor but starts as many executors as possible (limited by 
the worker resources) for
    +   * each visit.
    +   */
    +  private def startMultiExecutorsPerWorker() {
    +    if (spreadOutApps) {
    +      for (app <- waitingApps if app.coresLeft > 0) {
    +        val memoryPerExecutor = app.desc.memoryPerExecutorMB
    +        val usableWorkers = workers.filter(_.state == WorkerState.ALIVE).
    +          filter(canUse(app, _)).toArray.sortBy(_.memoryFree / 
memoryPerExecutor).reverse
    +        val maxCoreNumPerExecutor = app.desc.maxCorePerExecutor.get
    +        // get the maximum number of executors we can assign
    +        var leftExecutorNumToAssign = usableWorkers.map(_.memoryFree / 
memoryPerExecutor).sum
    +        var maxCoresLeft = app.coresLeft
    +        val numUsable = usableWorkers.length
    +        // 2D array to track the number of cores of each executor assigned 
to each worker
    +        val assigned = Array.fill[ListBuffer[Int]](numUsable)(new 
ListBuffer[Int])
    --- End diff --
    
    we should explain in more detail what this 2D array contains:
    ```
    // A 2D array that tracks the number of cores used by each executor 
launched on
    // each worker. The first index refers to the usable worker, and the second 
index
    // refers to the executor launched on that worker.
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to