Github user sryza commented on a diff in the pull request:

    https://github.com/apache/spark/pull/6430#discussion_r31275205
  
    --- Diff: 
core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala ---
    @@ -697,13 +697,65 @@ class ExecutorAllocationManagerSuite extends FunSuite 
with LocalSparkContext wit
         assert(numExecutorsTarget(manager) === 16)
       }
     
    -  private def createSparkContext(minExecutors: Int = 1, maxExecutors: Int 
= 5): SparkContext = {
    +  test("avoid ramp down initial executors until first job is submitted") {
    +    sc = createSparkContext(2, 5, 3)
    +    val manager = sc.executorAllocationManager.get
    +    val clock = new ManualClock(10000L)
    +    manager.setClock(clock)
    +
    +    // Verify the initial number of executors
    +    assert(numExecutorsTarget(manager) === 3)
    +    schedule(manager)
    +    // Verify whether the initial number of executors is kept with no 
pending tasks
    +    assert(numExecutorsTarget(manager) === 3)
    +
    +    
sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, 2)))
    +    clock.advance(100L)
    +
    +    assert(maxNumExecutorsNeeded(manager) === 2)
    +    schedule(manager)
    +
    +    // Verify that current number of executors should be ramp down when 
first job is submitted
    +    assert(numExecutorsTarget(manager) === 2)
    +  }
    +
    +  test("avoid ramp down initial executors until idle executor is timeout") 
{
    +    sc = createSparkContext(2, 5, 3)
    +    val manager = sc.executorAllocationManager.get
    +    val clock = new ManualClock(10000L)
    +    manager.setClock(clock)
    +
    +    // Verify the initial number of executors
    +    assert(numExecutorsTarget(manager) === 3)
    +    schedule(manager)
    +    // Verify the initial number of executors is kept when no pending tasks
    +    assert(numExecutorsTarget(manager) === 3)
    +    (0 until 3).foreach { i =>
    +      onExecutorAdded(manager, s"executor-$i")
    +    }
    +
    +    clock.advance(executorIdleTimeout * 1000)
    +
    +    assert(maxNumExecutorsNeeded(manager) === 0)
    +    schedule(manager)
    +    // Verify executor is timeout but numExecutorsTarget is not 
recalculated
    +    assert(numExecutorsTarget(manager) === 3)
    +
    +    // Schedule again to recalculate the numExecutorsTarget after executor 
is timeout
    +    schedule(manager)
    +    // Verify that current number of executors should be ramp down when 
executor is timeout
    +    assert(numExecutorsTarget(manager) === 2)
    +  }
    +
    +  private def createSparkContext(minExecutors: Int = 1, maxExecutors: Int 
= 5,
    +      initialExecutors: Int = 1): SparkContext = {
    --- End diff --
    
    @andrewor14 it's the other way around.  During the initialization period, 
we never want to go below initialExecutors, but we're ok going above it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to