Github user sryza commented on a diff in the pull request:
https://github.com/apache/spark/pull/6430#discussion_r31275205
--- Diff:
core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala ---
@@ -697,13 +697,65 @@ class ExecutorAllocationManagerSuite extends FunSuite
with LocalSparkContext wit
assert(numExecutorsTarget(manager) === 16)
}
- private def createSparkContext(minExecutors: Int = 1, maxExecutors: Int
= 5): SparkContext = {
+ test("avoid ramp down initial executors until first job is submitted") {
+ sc = createSparkContext(2, 5, 3)
+ val manager = sc.executorAllocationManager.get
+ val clock = new ManualClock(10000L)
+ manager.setClock(clock)
+
+ // Verify the initial number of executors
+ assert(numExecutorsTarget(manager) === 3)
+ schedule(manager)
+ // Verify whether the initial number of executors is kept with no
pending tasks
+ assert(numExecutorsTarget(manager) === 3)
+
+
sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, 2)))
+ clock.advance(100L)
+
+ assert(maxNumExecutorsNeeded(manager) === 2)
+ schedule(manager)
+
+ // Verify that current number of executors should be ramp down when
first job is submitted
+ assert(numExecutorsTarget(manager) === 2)
+ }
+
+ test("avoid ramp down initial executors until idle executor is timeout")
{
+ sc = createSparkContext(2, 5, 3)
+ val manager = sc.executorAllocationManager.get
+ val clock = new ManualClock(10000L)
+ manager.setClock(clock)
+
+ // Verify the initial number of executors
+ assert(numExecutorsTarget(manager) === 3)
+ schedule(manager)
+ // Verify the initial number of executors is kept when no pending tasks
+ assert(numExecutorsTarget(manager) === 3)
+ (0 until 3).foreach { i =>
+ onExecutorAdded(manager, s"executor-$i")
+ }
+
+ clock.advance(executorIdleTimeout * 1000)
+
+ assert(maxNumExecutorsNeeded(manager) === 0)
+ schedule(manager)
+ // Verify executor is timeout but numExecutorsTarget is not
recalculated
+ assert(numExecutorsTarget(manager) === 3)
+
+ // Schedule again to recalculate the numExecutorsTarget after executor
is timeout
+ schedule(manager)
+ // Verify that current number of executors should be ramp down when
executor is timeout
+ assert(numExecutorsTarget(manager) === 2)
+ }
+
+ private def createSparkContext(minExecutors: Int = 1, maxExecutors: Int
= 5,
+ initialExecutors: Int = 1): SparkContext = {
--- End diff --
@andrewor14 it's the other way around. During the initialization period,
we never want to go below initialExecutors, but we're ok going above it.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]