Ngone51 commented on code in PR #54766:
URL: https://github.com/apache/spark/pull/54766#discussion_r2922524935
##########
core/src/main/scala/org/apache/spark/deploy/master/Master.scala:
##########
@@ -838,36 +848,54 @@ private[deploy] class Master(
logInfo(log"Start scheduling for app ${MDC(LogKeys.APP_ID, app.id)}
with" +
log" rpId: ${MDC(LogKeys.RESOURCE_PROFILE_ID, rpId)}")
val resourceDesc = app.getResourceDescriptionForRpId(rpId)
+ val aliveWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
val coresPerExecutor = resourceDesc.coresPerExecutor.getOrElse(1)
// If the cores left is less than the coresPerExecutor,the cores left
will not be allocated
if (app.coresLeft >= coresPerExecutor) {
// Filter out workers that don't have enough resources to launch an
executor
- val aliveWorkers = workers.toArray.filter(_.state ==
WorkerState.ALIVE)
- .filter(canLaunchExecutor(_, resourceDesc))
- val usableWorkers = workerSelectionPolicy match {
- case CORES_FREE_ASC => aliveWorkers.sortBy(w => (w.coresFree,
w.id))
- case CORES_FREE_DESC => aliveWorkers.sortBy(w => (w.coresFree,
w.id)).reverse
- case MEMORY_FREE_ASC => aliveWorkers.sortBy(w => (w.memoryFree,
w.id))
- case MEMORY_FREE_DESC => aliveWorkers.sortBy(w => (w.memoryFree,
w.id)).reverse
- case WorkerSelectionPolicy.WORKER_ID => aliveWorkers.sortBy(_.id)
- }
+ val usableWorkers = aliveWorkers.filter(canLaunchExecutor(_,
resourceDesc))
+ val sortedUsableWorkers = sortUsableWorkersByPolicy(usableWorkers)
val appMayHang = waitingApps.length == 1 &&
- waitingApps.head.executors.isEmpty && usableWorkers.isEmpty
- if (appMayHang) {
+ waitingApps.head.executors.isEmpty && sortedUsableWorkers.isEmpty
+ // If the app would hang, log a warning. Also check whether
spark.executor.cores
+ // exceeds the total cores of any worker; if so, cap it to the max
worker cores and
+ // recompute the usable workers so the executor can be placed.
+ val (effectiveResourceDesc, effectiveUsableWorkers) = if
(appMayHang) {
logWarning(log"App ${MDC(LogKeys.APP_ID, app.id)} requires more
resource " +
log"than any of Workers could have.")
+ resourceDesc.coresPerExecutor match {
+ case Some(requestedCores)
+ if conf.get(CAP_EXECUTOR_CORES_ENABLED) &&
+ aliveWorkers.nonEmpty && requestedCores >
aliveWorkers.map(_.cores).max =>
+ val maxWorkerCores = aliveWorkers.map(_.cores).max
Review Comment:
My concern to the "cap" is that the executors could still be partially
launched if some workers' cores is still less than the worker with max cores.
In that case, some workers would still not be utilized.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]