squito commented on a change in pull request #24497: 
[SPARK-11334][CORE][FOLLOWUP] Fix bug in Executor allocation manager in running 
tasks calculation
URL: https://github.com/apache/spark/pull/24497#discussion_r280151247
 
 

 ##########
 File path: 
core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
 ##########
 @@ -281,8 +281,20 @@ class ExecutorAllocationManagerSuite
 
     post(sc.listenerBus, SparkListenerStageCompleted(stage))
 
+    val attemptStage = createStageInfo(stage.stageId, 5, attemptId = 1)
 
 Review comment:
   can you add a comment here to explain the test at a high level?  Also there 
are a few problems w/ the test below that don't reflect a realistic situation.  
My suggestions:
   
   ```
    test("ignore task end events from completed stages") {
       sc = createSparkContext(0, 10, 0)
       val manager = sc.executorAllocationManager.get
     
       // We simulate having a stage fail, but with tasks still running.  Then 
another attempt for
       // that stage is started, and we get task completions from the first 
stage attempt.  Make sure
       // the value of `totalTasksRunning` is consistent as tasks finish from 
both attempts (we only
       // count tasks from the active attempt)
       val stage = createStageInfo(0, 5)
       post(sc.listenerBus, SparkListenerStageSubmitted(stage))
       val taskInfo1 = createTaskInfo(0, 0, "executor-1")
       val taskInfo2 = createTaskInfo(1, 1, "executor-1")
       post(sc.listenerBus, SparkListenerTaskStart(0, 0, taskInfo1))
       post(sc.listenerBus, SparkListenerTaskStart(0, 0, taskInfo2))
   
       // the first attempt finishes (eg. with a fetch failure).  The tasks 
haven't completed yet, but
       // we ignore the running tasks.
       post(sc.listenerBus, SparkListenerStageCompleted(stage))
   
       // XXX: Is this the right value here?  Or do we want it to still be 2?  
There *are* still two tasks running
       assert(totalRunningTasks(manager) === 0)
   
       // submit another attempt for the stage.  We may get some task 
completions from the
       // first attempt, but they are ignored.
       val stageAttempt1 = createStageInfo(stage.stageId, 5, attemptId = 1)
       post(sc.listenerBus, SparkListenerStageSubmitted(stageAttempt1))
       post(sc.listenerBus, SparkListenerTaskEnd(0, 0, null, Success, 
taskInfo1, null))
       assert(totalRunningTasks(manager) === 0)
       val attemptTaskInfo1 = createTaskInfo(3, 0, "executor-1")
       val attemptTaskInfo2 = createTaskInfo(4, 1, "executor-1")
       post(sc.listenerBus, SparkListenerTaskStart(0, 1, attemptTaskInfo1))
       post(sc.listenerBus, SparkListenerTaskStart(0, 1, attemptTaskInfo2))
       assert(totalRunningTasks(manager) === 2)
       post(sc.listenerBus, SparkListenerTaskEnd(0, 1, null, Success, 
attemptTaskInfo1, null))
       assert(totalRunningTasks(manager) === 1)
       post(sc.listenerBus, SparkListenerTaskEnd(0, 0, null, Success, 
taskInfo2, null))
       assert(totalRunningTasks(manager) === 1)
       post(sc.listenerBus, SparkListenerTaskEnd(0, 1, null, Success, 
attemptTaskInfo2, null))
       assert(totalRunningTasks(manager) === 0)
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to