Github user andrewor14 commented on the pull request:
https://github.com/apache/spark/pull/1106#issuecomment-54680145
Hm, it looks like `launchDriver` is asynchronous, so there seems to be no
easy way to identify workers that have already been scheduled to launch a
driver. This means even with the outstanding changes we might schedule too many
drivers on the same worker. Now, we could keep track of the worker's remaining
memory and cores after scheduling them to launch drivers, but this adds some
complexity:
(semi-pseudocode)
```
// ID -> scheduled resources as a 2-tuple (memory, cores)
val scheduledWorkerResources = new HashMap[Int, (Int, Int)]
val shuffledWorkers = Random.shuffle(workers).iterator.filter(_.state ==
ALIVE)
for each waiting driver {
if (shuffledWorkers.hasNext) {
val candidateWorker = shuffledWorkers.next()
val (scheduledMemory, scheduledCores) =
scheduledWorkerResources.get(candidateWorker.id).getOrElse(0, 0)
val remainingMemory = candidateWorker.memory - scheduledMemory
val remainingCores = candidateWorker.cores - scheduledCores
// Compare remaining resources to account for workers that have already
been scheduled
if (remainingMemory > driver.mem && remainingCores > driver.cores) {
launchDriver(candidateWorker)
...
// update scheduledWorkerResources
// add back this used worker into the pool to iterate through
}
}
}
```
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]