kiszk commented on a change in pull request #27313: [SPARK-29148][CORE] Add
stage level scheduling dynamic allocation and scheduler backend changes
URL: https://github.com/apache/spark/pull/27313#discussion_r369716175
##########
File path: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
##########
@@ -307,113 +324,187 @@ private[spark] class ExecutorAllocationManager(
* @return the delta in the target number of executors.
*/
private def updateAndSyncNumExecutorsTarget(now: Long): Int = synchronized {
- val maxNeeded = maxNumExecutorsNeeded
if (initializing) {
// Do not change our target while we are still initializing,
// Otherwise the first job may have to ramp up unnecessarily
0
- } else if (maxNeeded < numExecutorsTarget) {
- // The target number exceeds the number we actually need, so stop adding
new
- // executors and inform the cluster manager to cancel the extra pending
requests
- val oldNumExecutorsTarget = numExecutorsTarget
- numExecutorsTarget = math.max(maxNeeded, minNumExecutors)
- numExecutorsToAdd = 1
-
- // If the new target has not changed, avoid sending a message to the
cluster manager
- if (numExecutorsTarget < oldNumExecutorsTarget) {
- // We lower the target number of executors but don't actively kill any
yet. Killing is
- // controlled separately by an idle timeout. It's still helpful to
reduce the target number
- // in case an executor just happens to get lost (eg., bad hardware, or
the cluster manager
- // preempts it) -- in that case, there is no point in trying to
immediately get a new
- // executor, since we wouldn't even use it yet.
- client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks,
hostToLocalTaskCount)
- logDebug(s"Lowering target number of executors to $numExecutorsTarget
(previously " +
- s"$oldNumExecutorsTarget) because not all requested executors are
actually needed")
+ } else {
+ val updatesNeeded = new mutable.HashMap[Int,
ExecutorAllocationManager.TargetNumUpdates]
+
+ // Update targets for all ResourceProfiles then do a single request to
the cluster manager
+ numExecutorsTargetPerResourceProfileId.foreach { case (rpId,
targetExecs) =>
+ val maxNeeded = maxNumExecutorsNeededPerResourceProfile(rpId)
+ if (maxNeeded < targetExecs) {
+ // The target number exceeds the number we actually need, so stop
adding new
+ // executors and inform the cluster manager to cancel the extra
pending requests
+
+ // We lower the target number of executors but don't actively kill
any yet. Killing is
+ // controlled separately by an idle timeout. It's still helpful to
reduce
+ // the target number in case an executor just happens to get lost
(eg., bad hardware,
+ // or the cluster manager preempts it) -- in that case, there is no
point in trying
+ // to immediately get a new executor, since we wouldn't even use it
yet.
+ decrementExecutorsFromTarget(maxNeeded, rpId, updatesNeeded)
+ } else if (addTime != NOT_SET && now >= addTime) {
+ addExecutorsToTarget(maxNeeded, rpId, updatesNeeded)
+ }
+ }
+ doUpdateRequest(updatesNeeded.toMap, now)
+ }
+ }
+
+ private def addExecutorsToTarget(
+ maxNeeded: Int,
+ rpId: Int,
+ updatesNeeded: mutable.HashMap[Int,
ExecutorAllocationManager.TargetNumUpdates]
+ ): Int = {
+ updateTargetExecs(addExecutors, maxNeeded, rpId, updatesNeeded)
+ }
+
+ private def decrementExecutorsFromTarget(
+ maxNeeded: Int,
+ rpId: Int,
+ updatesNeeded: mutable.HashMap[Int,
ExecutorAllocationManager.TargetNumUpdates]
+ ): Int = {
+ updateTargetExecs(decrementExecutors, maxNeeded, rpId, updatesNeeded)
+ }
+
+ private def updateTargetExecs(
+ updateTargetFn: (Int, Int) => Int,
+ maxNeeded: Int,
+ rpId: Int,
+ updatesNeeded: mutable.HashMap[Int,
ExecutorAllocationManager.TargetNumUpdates]
+ ): Int = {
+ val oldNumExecutorsTarget = numExecutorsTargetPerResourceProfileId(rpId)
+ // update the target number (add or remove)
+ val delta = updateTargetFn(maxNeeded, rpId)
+ if (delta != 0) {
+ updatesNeeded(rpId) = ExecutorAllocationManager.TargetNumUpdates(delta,
oldNumExecutorsTarget)
+ }
+ delta
+ }
+
+ private def doUpdateRequest(
+ updates: Map[Int, ExecutorAllocationManager.TargetNumUpdates],
+ now: Long): Int = {
+ // Only call cluster manager if target has changed.
+ if (updates.size > 0) {
+ val requestAcknowledged = try {
+ logDebug("requesting updates: " + updates)
+ testing ||
+ client.requestTotalExecutors(
+ numExecutorsTargetPerResourceProfileId.toMap,
+ numLocalityAwareTasksPerResourceProfileId.toMap,
+ rpIdToHostToLocalTaskCount)
+ } catch {
+ case NonFatal(e) =>
+ // Use INFO level so the error it doesn't show up by default in
shells.
+ // Errors here are more commonly caused by YARN AM restarts, which
is a recoverable
+ // issue, and generate a lot of noisy output.
+ logInfo("Error reaching cluster manager.", e)
+ false
+ }
+ if (requestAcknowledged) {
+ // have to go through all resource profiles that changed
+ var totalDelta = 0
+ updates.foreach { case (rpId, targetNum) =>
+ val delta = targetNum.delta
+ totalDelta += delta
+ if (delta > 0) {
+ val executorsString = "executor" + { if (delta > 1) "s" else "" }
+ logInfo(s"Requesting $delta new $executorsString because tasks are
backlogged " +
+ s"(new desired total will be
${numExecutorsTargetPerResourceProfileId(rpId)} " +
+ s"for resource profile id: ${rpId})")
+ numExecutorsToAddPerResourceProfileId(rpId) =
+ if (delta == numExecutorsToAddPerResourceProfileId(rpId)) {
+ numExecutorsToAddPerResourceProfileId(rpId) * 2
+ } else {
+ 1
+ }
+ logDebug(s"Starting timer to add more executors (to " +
+ s"expire in $sustainedSchedulerBacklogTimeoutS seconds)")
+ addTime = now +
TimeUnit.SECONDS.toNanos(sustainedSchedulerBacklogTimeoutS)
+ } else {
+ logDebug(s"Lowering target number of executors to" +
+ s" ${numExecutorsTargetPerResourceProfileId(rpId)} (previously "
+
+ s"$targetNum.oldNumExecutorsTarget for resource profile id:
${rpId}) " +
+ "because not all requested executors " +
+ "are actually needed")
+ }
+ }
+ totalDelta
+ } else {
+ // request was for all profiles so we have to go through all to reset
to old num
+ updates.foreach { case (rpId, targetNum) =>
+ logWarning(
+ s"Unable to reach the cluster manager to request more executors!")
Review comment:
nit: `s` is not necessary
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]