Repository: spark Updated Branches: refs/heads/branch-1.3 d7269493d -> 98ac39d2f
[SPARK-6954] [YARN] ExecutorAllocationManager can end up requesting a ne... ...gative n... ...umber of executors Author: Sandy Ryza <sandycloudera.com> Closes #5704 from sryza/sandy-spark-6954 and squashes the following commits: b7890fb [Sandy Ryza] Avoid ramping up to an existing number of executors 6eb516a [Sandy Ryza] SPARK-6954. ExecutorAllocationManager can end up requesting a negative number of executors Author: Sandy Ryza <sa...@cloudera.com> Closes #5856 from sryza/sandy-backport-6954 and squashes the following commits: 1cb517a [Sandy Ryza] [SPARK-6954] [YARN] ExecutorAllocationManager can end up requesting a negative n... Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/98ac39d2 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/98ac39d2 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/98ac39d2 Branch: refs/heads/branch-1.3 Commit: 98ac39d2f5828fbdad8c9a4e563ad1169e3b9948 Parents: d726949 Author: Sandy Ryza <sa...@cloudera.com> Authored: Sat May 2 10:59:07 2015 +0100 Committer: Sean Owen <so...@cloudera.com> Committed: Sat May 2 10:59:07 2015 +0100 ---------------------------------------------------------------------- .../spark/ExecutorAllocationManager.scala | 101 ++++++----- .../spark/ExecutorAllocationManagerSuite.scala | 171 +++++++++++-------- 2 files changed, 148 insertions(+), 124 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/98ac39d2/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index 9385f55..adbd247 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -27,13 +27,20 @@ import org.apache.spark.util.{Clock, SystemClock, Utils} /** * An agent that dynamically allocates and removes executors based on the workload. * - * The add policy depends on whether there are backlogged tasks waiting to be scheduled. If - * the scheduler queue is not drained in N seconds, then new executors are added. If the queue - * persists for another M seconds, then more executors are added and so on. The number added - * in each round increases exponentially from the previous round until an upper bound on the - * number of executors has been reached. The upper bound is based both on a configured property - * and on the number of tasks pending: the policy will never increase the number of executor - * requests past the number needed to handle all pending tasks. + * The ExecutorAllocationManager maintains a moving target number of executors which is periodically + * synced to the cluster manager. The target starts at a configured initial value and changes with + * the number of pending and running tasks. + * + * Decreasing the target number of executors happens when the current target is more than needed to + * handle the current load. The target number of executors is always truncated to the number of + * executors that could run all current running and pending tasks at once. + * + * Increasing the target number of executors happens in response to backlogged tasks waiting to be + * scheduled. If the scheduler queue is not drained in N seconds, then new executors are added. If + * the queue persists for another M seconds, then more executors are added and so on. The number + * added in each round increases exponentially from the previous round until an upper bound has been + * reached. The upper bound is based both on a configured property and on the current number of + * running and pending tasks, as described above. * * The rationale for the exponential increase is twofold: (1) Executors should be added slowly * in the beginning in case the number of extra executors needed turns out to be small. Otherwise, @@ -105,8 +112,10 @@ private[spark] class ExecutorAllocationManager( // Number of executors to add in the next round private var numExecutorsToAdd = 1 - // Number of executors that have been requested but have not registered yet - private var numExecutorsPending = 0 + // The desired number of executors at this moment in time. If all our executors were to die, this + // is the number of executors we would immediately want from the cluster manager. + private var numExecutorsTarget = + conf.getInt("spark.dynamicAllocation.initialExecutors", minNumExecutors) // Executors that have been requested to be removed but have not been killed yet private val executorsPendingToRemove = new mutable.HashSet[String] @@ -200,13 +209,6 @@ private[spark] class ExecutorAllocationManager( } /** - * The number of executors we would have if the cluster manager were to fulfill all our existing - * requests. - */ - private def targetNumExecutors(): Int = - numExecutorsPending + executorIds.size - executorsPendingToRemove.size - - /** * The maximum number of executors we would need under the current load to satisfy all running * and pending tasks, rounded up. */ @@ -227,7 +229,7 @@ private[spark] class ExecutorAllocationManager( private def schedule(): Unit = synchronized { val now = clock.getTimeMillis - addOrCancelExecutorRequests(now) + updateAndSyncNumExecutorsTarget(now) removeTimes.retain { case (executorId, expireTime) => val expired = now >= expireTime @@ -239,26 +241,28 @@ private[spark] class ExecutorAllocationManager( } /** + * Updates our target number of executors and syncs the result with the cluster manager. + * * Check to see whether our existing allocation and the requests we've made previously exceed our - * current needs. If so, let the cluster manager know so that it can cancel pending requests that - * are unneeded. + * current needs. If so, truncate our target and let the cluster manager know so that it can + * cancel pending requests that are unneeded. * * If not, and the add time has expired, see if we can request new executors and refresh the add * time. * * @return the delta in the target number of executors. */ - private def addOrCancelExecutorRequests(now: Long): Int = synchronized { - val currentTarget = targetNumExecutors + private def updateAndSyncNumExecutorsTarget(now: Long): Int = synchronized { val maxNeeded = maxNumExecutorsNeeded - if (maxNeeded < currentTarget) { + if (maxNeeded < numExecutorsTarget) { // The target number exceeds the number we actually need, so stop adding new - // executors and inform the cluster manager to cancel the extra pending requests. - val newTotalExecutors = math.max(maxNeeded, minNumExecutors) - client.requestTotalExecutors(newTotalExecutors) + // executors and inform the cluster manager to cancel the extra pending requests + val oldNumExecutorsTarget = numExecutorsTarget + numExecutorsTarget = math.max(maxNeeded, minNumExecutors) + client.requestTotalExecutors(numExecutorsTarget) numExecutorsToAdd = 1 - updateNumExecutorsPending(newTotalExecutors) + numExecutorsTarget - oldNumExecutorsTarget } else if (addTime != NOT_SET && now >= addTime) { val delta = addExecutors(maxNeeded) logDebug(s"Starting timer to add more executors (to " + @@ -281,21 +285,30 @@ private[spark] class ExecutorAllocationManager( */ private def addExecutors(maxNumExecutorsNeeded: Int): Int = { // Do not request more executors if it would put our target over the upper bound - val currentTarget = targetNumExecutors - if (currentTarget >= maxNumExecutors) { - logDebug(s"Not adding executors because there are already ${executorIds.size} " + - s"registered and $numExecutorsPending pending executor(s) (limit $maxNumExecutors)") + if (numExecutorsTarget >= maxNumExecutors) { + val numExecutorsPending = numExecutorsTarget - executorIds.size + logDebug(s"Not adding executors because there are already ${executorIds.size} registered " + + s"and ${numExecutorsPending} pending executor(s) (limit $maxNumExecutors)") numExecutorsToAdd = 1 return 0 } - val actualMaxNumExecutors = math.min(maxNumExecutors, maxNumExecutorsNeeded) - val newTotalExecutors = math.min(currentTarget + numExecutorsToAdd, actualMaxNumExecutors) - val addRequestAcknowledged = testing || client.requestTotalExecutors(newTotalExecutors) + val oldNumExecutorsTarget = numExecutorsTarget + // There's no point in wasting time ramping up to the number of executors we already have, so + // make sure our target is at least as much as our current allocation: + numExecutorsTarget = math.max(numExecutorsTarget, executorIds.size) + // Boost our target with the number to add for this round: + numExecutorsTarget += numExecutorsToAdd + // Ensure that our target doesn't exceed what we need at the present moment: + numExecutorsTarget = math.min(numExecutorsTarget, maxNumExecutorsNeeded) + // Ensure that our target fits within configured bounds: + numExecutorsTarget = math.max(math.min(numExecutorsTarget, maxNumExecutors), minNumExecutors) + + val addRequestAcknowledged = testing || client.requestTotalExecutors(numExecutorsTarget) if (addRequestAcknowledged) { - val delta = updateNumExecutorsPending(newTotalExecutors) + val delta = numExecutorsTarget - oldNumExecutorsTarget logInfo(s"Requesting $delta new executor(s) because tasks are backlogged" + - s" (new desired total will be $newTotalExecutors)") + s" (new desired total will be $numExecutorsTarget)") numExecutorsToAdd = if (delta == numExecutorsToAdd) { numExecutorsToAdd * 2 } else { @@ -304,24 +317,12 @@ private[spark] class ExecutorAllocationManager( delta } else { logWarning( - s"Unable to reach the cluster manager to request $newTotalExecutors total executors!") + s"Unable to reach the cluster manager to request $numExecutorsTarget total executors!") 0 } } /** - * Given the new target number of executors, update the number of pending executor requests, - * and return the delta from the old number of pending requests. - */ - private def updateNumExecutorsPending(newTotalExecutors: Int): Int = { - val newNumExecutorsPending = - newTotalExecutors - executorIds.size + executorsPendingToRemove.size - val delta = newNumExecutorsPending - numExecutorsPending - numExecutorsPending = newNumExecutorsPending - delta - } - - /** * Request the cluster manager to remove the given executor. * Return whether the request is received. */ @@ -372,10 +373,6 @@ private[spark] class ExecutorAllocationManager( // as idle again so as not to forget that it is a candidate for removal. (see SPARK-4951) executorIds.filter(listener.isExecutorIdle).foreach(onExecutorIdle) logInfo(s"New executor $executorId has registered (new total is ${executorIds.size})") - if (numExecutorsPending > 0) { - numExecutorsPending -= 1 - logDebug(s"Decremented number of pending executors ($numExecutorsPending left)") - } } else { logWarning(s"Duplicate executor $executorId has registered") } http://git-wip-us.apache.org/repos/asf/spark/blob/98ac39d2/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala index 3ded1e4..114c8d4 100644 --- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala @@ -84,7 +84,7 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext wit test("starting state") { sc = createSparkContext() val manager = sc.executorAllocationManager.get - assert(numExecutorsPending(manager) === 0) + assert(numExecutorsTarget(manager) === 1) assert(executorsPendingToRemove(manager).isEmpty) assert(executorIds(manager).isEmpty) assert(addTime(manager) === ExecutorAllocationManager.NOT_SET) @@ -97,108 +97,108 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext wit sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1000))) // Keep adding until the limit is reached - assert(numExecutorsPending(manager) === 0) + assert(numExecutorsTarget(manager) === 1) assert(numExecutorsToAdd(manager) === 1) assert(addExecutors(manager) === 1) - assert(numExecutorsPending(manager) === 1) + assert(numExecutorsTarget(manager) === 2) assert(numExecutorsToAdd(manager) === 2) assert(addExecutors(manager) === 2) - assert(numExecutorsPending(manager) === 3) + assert(numExecutorsTarget(manager) === 4) assert(numExecutorsToAdd(manager) === 4) assert(addExecutors(manager) === 4) - assert(numExecutorsPending(manager) === 7) + assert(numExecutorsTarget(manager) === 8) assert(numExecutorsToAdd(manager) === 8) - assert(addExecutors(manager) === 3) // reached the limit of 10 - assert(numExecutorsPending(manager) === 10) + assert(addExecutors(manager) === 2) // reached the limit of 10 + assert(numExecutorsTarget(manager) === 10) assert(numExecutorsToAdd(manager) === 1) assert(addExecutors(manager) === 0) - assert(numExecutorsPending(manager) === 10) + assert(numExecutorsTarget(manager) === 10) assert(numExecutorsToAdd(manager) === 1) // Register previously requested executors onExecutorAdded(manager, "first") - assert(numExecutorsPending(manager) === 9) + assert(numExecutorsTarget(manager) === 10) onExecutorAdded(manager, "second") onExecutorAdded(manager, "third") onExecutorAdded(manager, "fourth") - assert(numExecutorsPending(manager) === 6) + assert(numExecutorsTarget(manager) === 10) onExecutorAdded(manager, "first") // duplicates should not count onExecutorAdded(manager, "second") - assert(numExecutorsPending(manager) === 6) + assert(numExecutorsTarget(manager) === 10) // Try adding again // This should still fail because the number pending + running is still at the limit assert(addExecutors(manager) === 0) - assert(numExecutorsPending(manager) === 6) + assert(numExecutorsTarget(manager) === 10) assert(numExecutorsToAdd(manager) === 1) assert(addExecutors(manager) === 0) - assert(numExecutorsPending(manager) === 6) + assert(numExecutorsTarget(manager) === 10) assert(numExecutorsToAdd(manager) === 1) } test("add executors capped by num pending tasks") { - sc = createSparkContext(1, 10) + sc = createSparkContext(0, 10) val manager = sc.executorAllocationManager.get sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 5))) // Verify that we're capped at number of tasks in the stage - assert(numExecutorsPending(manager) === 0) + assert(numExecutorsTarget(manager) === 0) assert(numExecutorsToAdd(manager) === 1) assert(addExecutors(manager) === 1) - assert(numExecutorsPending(manager) === 1) + assert(numExecutorsTarget(manager) === 1) assert(numExecutorsToAdd(manager) === 2) assert(addExecutors(manager) === 2) - assert(numExecutorsPending(manager) === 3) + assert(numExecutorsTarget(manager) === 3) assert(numExecutorsToAdd(manager) === 4) assert(addExecutors(manager) === 2) - assert(numExecutorsPending(manager) === 5) + assert(numExecutorsTarget(manager) === 5) assert(numExecutorsToAdd(manager) === 1) - // Verify that running a task reduces the cap + // Verify that running a task doesn't affect the target sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, 3))) sc.listenerBus.postToAll(SparkListenerExecutorAdded( 0L, "executor-1", new ExecutorInfo("host1", 1, Map.empty))) sc.listenerBus.postToAll(SparkListenerTaskStart(1, 0, createTaskInfo(0, 0, "executor-1"))) - assert(numExecutorsPending(manager) === 4) + assert(numExecutorsTarget(manager) === 5) assert(addExecutors(manager) === 1) - assert(numExecutorsPending(manager) === 5) + assert(numExecutorsTarget(manager) === 6) assert(numExecutorsToAdd(manager) === 2) assert(addExecutors(manager) === 2) - assert(numExecutorsPending(manager) === 7) + assert(numExecutorsTarget(manager) === 8) assert(numExecutorsToAdd(manager) === 4) assert(addExecutors(manager) === 0) - assert(numExecutorsPending(manager) === 7) + assert(numExecutorsTarget(manager) === 8) assert(numExecutorsToAdd(manager) === 1) - // Verify that re-running a task doesn't reduce the cap further + // Verify that re-running a task doesn't blow things up sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(2, 3))) sc.listenerBus.postToAll(SparkListenerTaskStart(2, 0, createTaskInfo(0, 0, "executor-1"))) sc.listenerBus.postToAll(SparkListenerTaskStart(2, 0, createTaskInfo(1, 0, "executor-1"))) assert(addExecutors(manager) === 1) - assert(numExecutorsPending(manager) === 8) + assert(numExecutorsTarget(manager) === 9) assert(numExecutorsToAdd(manager) === 2) assert(addExecutors(manager) === 1) - assert(numExecutorsPending(manager) === 9) + assert(numExecutorsTarget(manager) === 10) assert(numExecutorsToAdd(manager) === 1) // Verify that running a task once we're at our limit doesn't blow things up sc.listenerBus.postToAll(SparkListenerTaskStart(2, 0, createTaskInfo(0, 1, "executor-1"))) assert(addExecutors(manager) === 0) - assert(numExecutorsPending(manager) === 9) + assert(numExecutorsTarget(manager) === 10) } test("cancel pending executors when no longer needed") { - sc = createSparkContext(1, 10) + sc = createSparkContext(0, 10) val manager = sc.executorAllocationManager.get sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(2, 5))) - assert(numExecutorsPending(manager) === 0) + assert(numExecutorsTarget(manager) === 0) assert(numExecutorsToAdd(manager) === 1) assert(addExecutors(manager) === 1) - assert(numExecutorsPending(manager) === 1) + assert(numExecutorsTarget(manager) === 1) assert(numExecutorsToAdd(manager) === 2) assert(addExecutors(manager) === 2) - assert(numExecutorsPending(manager) === 3) + assert(numExecutorsTarget(manager) === 3) val task1Info = createTaskInfo(0, 0, "executor-1") sc.listenerBus.postToAll(SparkListenerTaskStart(2, 0, task1Info)) @@ -272,7 +272,6 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext wit // Add a few executors assert(addExecutors(manager) === 1) assert(addExecutors(manager) === 2) - assert(addExecutors(manager) === 4) onExecutorAdded(manager, "1") onExecutorAdded(manager, "2") onExecutorAdded(manager, "3") @@ -280,55 +279,57 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext wit onExecutorAdded(manager, "5") onExecutorAdded(manager, "6") onExecutorAdded(manager, "7") - assert(executorIds(manager).size === 7) + onExecutorAdded(manager, "8") + assert(executorIds(manager).size === 8) // Remove until limit assert(removeExecutor(manager, "1")) assert(removeExecutor(manager, "2")) - assert(!removeExecutor(manager, "3")) // lower limit reached - assert(!removeExecutor(manager, "4")) + assert(removeExecutor(manager, "3")) + assert(!removeExecutor(manager, "4")) // lower limit reached + assert(!removeExecutor(manager, "5")) onExecutorRemoved(manager, "1") onExecutorRemoved(manager, "2") + onExecutorRemoved(manager, "3") assert(executorIds(manager).size === 5) // Add until limit - assert(addExecutors(manager) === 5) // upper limit reached + assert(addExecutors(manager) === 2) // upper limit reached assert(addExecutors(manager) === 0) - assert(!removeExecutor(manager, "3")) // still at lower limit - assert(!removeExecutor(manager, "4")) - onExecutorAdded(manager, "8") + assert(!removeExecutor(manager, "4")) // still at lower limit + assert(!removeExecutor(manager, "5")) onExecutorAdded(manager, "9") onExecutorAdded(manager, "10") onExecutorAdded(manager, "11") onExecutorAdded(manager, "12") + onExecutorAdded(manager, "13") assert(executorIds(manager).size === 10) // Remove succeeds again, now that we are no longer at the lower limit - assert(removeExecutor(manager, "3")) assert(removeExecutor(manager, "4")) assert(removeExecutor(manager, "5")) assert(removeExecutor(manager, "6")) + assert(removeExecutor(manager, "7")) assert(executorIds(manager).size === 10) - assert(addExecutors(manager) === 1) - onExecutorRemoved(manager, "3") + assert(addExecutors(manager) === 0) onExecutorRemoved(manager, "4") + onExecutorRemoved(manager, "5") assert(executorIds(manager).size === 8) - // Add succeeds again, now that we are no longer at the upper limit - // Number of executors added restarts at 1 - assert(addExecutors(manager) === 2) - assert(addExecutors(manager) === 1) // upper limit reached + // Number of executors pending restarts at 1 + assert(numExecutorsToAdd(manager) === 1) assert(addExecutors(manager) === 0) assert(executorIds(manager).size === 8) - onExecutorRemoved(manager, "5") onExecutorRemoved(manager, "6") - onExecutorAdded(manager, "13") + onExecutorRemoved(manager, "7") onExecutorAdded(manager, "14") + onExecutorAdded(manager, "15") assert(executorIds(manager).size === 8) assert(addExecutors(manager) === 0) // still at upper limit - onExecutorAdded(manager, "15") onExecutorAdded(manager, "16") + onExecutorAdded(manager, "17") assert(executorIds(manager).size === 10) + assert(numExecutorsTarget(manager) === 10) } test("starting/canceling add timer") { @@ -411,33 +412,33 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext wit } test("mock polling loop with no events") { - sc = createSparkContext(1, 20) + sc = createSparkContext(0, 20) val manager = sc.executorAllocationManager.get val clock = new ManualClock(2020L) manager.setClock(clock) // No events - we should not be adding or removing - assert(numExecutorsPending(manager) === 0) + assert(numExecutorsTarget(manager) === 0) assert(executorsPendingToRemove(manager).isEmpty) schedule(manager) - assert(numExecutorsPending(manager) === 0) + assert(numExecutorsTarget(manager) === 0) assert(executorsPendingToRemove(manager).isEmpty) clock.advance(100L) schedule(manager) - assert(numExecutorsPending(manager) === 0) + assert(numExecutorsTarget(manager) === 0) assert(executorsPendingToRemove(manager).isEmpty) clock.advance(1000L) schedule(manager) - assert(numExecutorsPending(manager) === 0) + assert(numExecutorsTarget(manager) === 0) assert(executorsPendingToRemove(manager).isEmpty) clock.advance(10000L) schedule(manager) - assert(numExecutorsPending(manager) === 0) + assert(numExecutorsTarget(manager) === 0) assert(executorsPendingToRemove(manager).isEmpty) } test("mock polling loop add behavior") { - sc = createSparkContext(1, 20) + sc = createSparkContext(0, 20) val clock = new ManualClock(2020L) val manager = sc.executorAllocationManager.get manager.setClock(clock) @@ -447,43 +448,43 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext wit onSchedulerBacklogged(manager) clock.advance(schedulerBacklogTimeout * 1000 / 2) schedule(manager) - assert(numExecutorsPending(manager) === 0) // timer not exceeded yet + assert(numExecutorsTarget(manager) === 0) // timer not exceeded yet clock.advance(schedulerBacklogTimeout * 1000) schedule(manager) - assert(numExecutorsPending(manager) === 1) // first timer exceeded + assert(numExecutorsTarget(manager) === 1) // first timer exceeded clock.advance(sustainedSchedulerBacklogTimeout * 1000 / 2) schedule(manager) - assert(numExecutorsPending(manager) === 1) // second timer not exceeded yet + assert(numExecutorsTarget(manager) === 1) // second timer not exceeded yet clock.advance(sustainedSchedulerBacklogTimeout * 1000) schedule(manager) - assert(numExecutorsPending(manager) === 1 + 2) // second timer exceeded + assert(numExecutorsTarget(manager) === 1 + 2) // second timer exceeded clock.advance(sustainedSchedulerBacklogTimeout * 1000) schedule(manager) - assert(numExecutorsPending(manager) === 1 + 2 + 4) // third timer exceeded + assert(numExecutorsTarget(manager) === 1 + 2 + 4) // third timer exceeded // Scheduler queue drained onSchedulerQueueEmpty(manager) clock.advance(sustainedSchedulerBacklogTimeout * 1000) schedule(manager) - assert(numExecutorsPending(manager) === 7) // timer is canceled + assert(numExecutorsTarget(manager) === 7) // timer is canceled clock.advance(sustainedSchedulerBacklogTimeout * 1000) schedule(manager) - assert(numExecutorsPending(manager) === 7) + assert(numExecutorsTarget(manager) === 7) // Scheduler queue backlogged again onSchedulerBacklogged(manager) clock.advance(schedulerBacklogTimeout * 1000) schedule(manager) - assert(numExecutorsPending(manager) === 7 + 1) // timer restarted + assert(numExecutorsTarget(manager) === 7 + 1) // timer restarted clock.advance(sustainedSchedulerBacklogTimeout * 1000) schedule(manager) - assert(numExecutorsPending(manager) === 7 + 1 + 2) + assert(numExecutorsTarget(manager) === 7 + 1 + 2) clock.advance(sustainedSchedulerBacklogTimeout * 1000) schedule(manager) - assert(numExecutorsPending(manager) === 7 + 1 + 2 + 4) + assert(numExecutorsTarget(manager) === 7 + 1 + 2 + 4) clock.advance(sustainedSchedulerBacklogTimeout * 1000) schedule(manager) - assert(numExecutorsPending(manager) === 20) // limit reached + assert(numExecutorsTarget(manager) === 20) // limit reached } test("mock polling loop remove behavior") { @@ -677,6 +678,31 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext wit assert(!removeTimes(manager).contains("executor-1")) } + test("avoid ramp up when target < running executors") { + sc = createSparkContext(0, 100000) + val manager = sc.executorAllocationManager.get + val stage1 = createStageInfo(0, 1000) + sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage1)) + + assert(addExecutors(manager) === 1) + assert(addExecutors(manager) === 2) + assert(addExecutors(manager) === 4) + assert(addExecutors(manager) === 8) + assert(numExecutorsTarget(manager) === 15) + (0 until 15).foreach { i => + onExecutorAdded(manager, s"executor-$i") + } + assert(executorIds(manager).size === 15) + sc.listenerBus.postToAll(SparkListenerStageCompleted(stage1)) + + adjustRequestedExecutors(manager) + assert(numExecutorsTarget(manager) === 0) + + sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, 1000))) + addExecutors(manager) + assert(numExecutorsTarget(manager) === 16) + } + private def createSparkContext(minExecutors: Int = 1, maxExecutors: Int = 5): SparkContext = { val conf = new SparkConf() .setMaster("local") @@ -718,7 +744,7 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester { * ------------------------------------------------------- */ private val _numExecutorsToAdd = PrivateMethod[Int]('numExecutorsToAdd) - private val _numExecutorsPending = PrivateMethod[Int]('numExecutorsPending) + private val _numExecutorsTarget = PrivateMethod[Int]('numExecutorsTarget) private val _maxNumExecutorsNeeded = PrivateMethod[Int]('maxNumExecutorsNeeded) private val _executorsPendingToRemove = PrivateMethod[collection.Set[String]]('executorsPendingToRemove) @@ -727,7 +753,8 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester { private val _removeTimes = PrivateMethod[collection.Map[String, Long]]('removeTimes) private val _schedule = PrivateMethod[Unit]('schedule) private val _addExecutors = PrivateMethod[Int]('addExecutors) - private val _addOrCancelExecutorRequests = PrivateMethod[Int]('addOrCancelExecutorRequests) + private val _updateAndSyncNumExecutorsTarget = + PrivateMethod[Int]('updateAndSyncNumExecutorsTarget) private val _removeExecutor = PrivateMethod[Boolean]('removeExecutor) private val _onExecutorAdded = PrivateMethod[Unit]('onExecutorAdded) private val _onExecutorRemoved = PrivateMethod[Unit]('onExecutorRemoved) @@ -740,8 +767,8 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester { manager invokePrivate _numExecutorsToAdd() } - private def numExecutorsPending(manager: ExecutorAllocationManager): Int = { - manager invokePrivate _numExecutorsPending() + private def numExecutorsTarget(manager: ExecutorAllocationManager): Int = { + manager invokePrivate _numExecutorsTarget() } private def executorsPendingToRemove( @@ -771,7 +798,7 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester { } private def adjustRequestedExecutors(manager: ExecutorAllocationManager): Int = { - manager invokePrivate _addOrCancelExecutorRequests(0L) + manager invokePrivate _updateAndSyncNumExecutorsTarget(0L) } private def removeExecutor(manager: ExecutorAllocationManager, id: String): Boolean = { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org