Github user mridulm commented on a diff in the pull request:

    https://github.com/apache/spark/pull/731#discussion_r12512027
  
    --- Diff: core/src/main/scala/org/apache/spark/deploy/master/Master.scala 
---
    @@ -532,6 +516,99 @@ private[spark] class Master(
         }
       }
     
    +  private def startMultiExecutorsPerWorker() {
    +    // allow user to run multiple executors in the same worker
    +    // (within the same worker JVM process)
    +    if (spreadOutApps) {
    +      for (app <- waitingApps if app.coresLeft > 0) {
    +        val memoryPerExecutor = app.desc.memoryPerExecutor
    +        var usableWorkers = workers.toArray.filter(_.state == 
WorkerState.ALIVE).
    +          filter(worker => worker.coresFree > 0 && worker.memoryFree >= 
memoryPerExecutor).
    +          sortBy(_.memoryFree / memoryPerExecutor).reverse
    +        val maxCoreNumPerExecutor = app.desc.maxCorePerExecutor.get
    +        // get the maximum total number of executors we can assign
    +        var maxLeftExecutorsToAssign = usableWorkers.map(_.memoryFree / 
memoryPerExecutor).sum
    +        var maxCoresLeft = maxLeftExecutorsToAssign * maxCoreNumPerExecutor
    --- End diff --
    
    I am not very sure of this piece of code ... But it is too late in the
    night right now, so I don't want to make obviously stupid comments due to
    exhaustion :-)
    I am not sure if I can get to this PR coming week; please do get it checked
    out by someone else too !
    On 12-May-2014 5:26 am, "Nan Zhu" <[email protected]> wrote:
    
    > In core/src/main/scala/org/apache/spark/deploy/master/Master.scala:
    >
    > > @@ -532,6 +516,99 @@ private[spark] class Master(
    > >      }
    > >    }
    > >
    > > +  private def startMultiExecutorsPerWorker() {
    > > +    // allow user to run multiple executors in the same worker
    > > +    // (within the same worker JVM process)
    > > +    if (spreadOutApps) {
    > > +      for (app <- waitingApps if app.coresLeft > 0) {
    > > +        val memoryPerExecutor = app.desc.memoryPerExecutor
    > > +        var usableWorkers = workers.toArray.filter(_.state == 
WorkerState.ALIVE).
    > > +          filter(worker => worker.coresFree > 0 && worker.memoryFree 
>= memoryPerExecutor).
    > > +          sortBy(_.memoryFree / memoryPerExecutor).reverse
    > > +        val maxCoreNumPerExecutor = app.desc.maxCorePerExecutor.get
    > > +        // get the maximum total number of executors we can assign
    > > +        var maxLeftExecutorsToAssign = usableWorkers.map(_.memoryFree 
/ memoryPerExecutor).sum
    > > +        var maxCoresLeft = maxLeftExecutorsToAssign * 
maxCoreNumPerExecutor
    >
    > the idea here is, user has an expectation on the maximum cores to assign
    > to the application, but this expectation is usually not achievable due to
    > the limited cores in each worker;
    >
    > so the allocation here is to decide executorNum per Worker according to
    > the memory space on each worker (because this is a hard limitation), and
    > meet the user's expectation on the cores with the best efforts
    >
    > —
    > Reply to this email directly or view it on 
GitHub<https://github.com/apache/spark/pull/731/files#r12511614>
    > .
    >


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to