Github user ankuriitg commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22209#discussion_r215056633
  
    --- Diff: 
core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala ---
    @@ -1190,6 +1190,61 @@ class AppStatusListenerSuite extends SparkFunSuite 
with BeforeAndAfter {
         assert(appStore.asOption(appStore.lastStageAttempt(3)) === None)
       }
     
    +  test("SPARK-24415: update metrics for tasks that finish late") {
    +    val listener = new AppStatusListener(store, conf, true)
    +
    +    val stage1 = new StageInfo(1, 0, "stage1", 4, Nil, Nil, "details1")
    +    val stage2 = new StageInfo(2, 0, "stage2", 4, Nil, Nil, "details2")
    +
    +    // Start job
    +    listener.onJobStart(SparkListenerJobStart(1, time, Seq(stage1, 
stage2), null))
    +
    +    // Start 2 stages
    +    listener.onStageSubmitted(SparkListenerStageSubmitted(stage1, new 
Properties()))
    +    listener.onStageSubmitted(SparkListenerStageSubmitted(stage2, new 
Properties()))
    +
    +    // Start 2 Tasks
    +    val tasks = createTasks(2, Array("1"))
    +    tasks.foreach { task =>
    +      listener.onTaskStart(SparkListenerTaskStart(stage1.stageId, 
stage1.attemptNumber, task))
    +    }
    +
    +    // Task 1 Finished
    +    time += 1
    +    tasks(0).markFinished(TaskState.FINISHED, time)
    +    listener.onTaskEnd(
    +      SparkListenerTaskEnd(stage1.stageId, stage1.attemptId, "taskType", 
Success, tasks(0), null))
    +
    +    // Stage 1 Completed
    +    stage1.failureReason = Some("Failed")
    +    listener.onStageCompleted(SparkListenerStageCompleted(stage1))
    +
    +    // Stop job 1
    +    time += 1
    +    listener.onJobEnd(SparkListenerJobEnd(1, time, JobSucceeded))
    +
    +    // Task 2 Killed
    +    time += 1
    +    tasks(1).markFinished(TaskState.FINISHED, time)
    +    listener.onTaskEnd(
    +      SparkListenerTaskEnd(stage1.stageId, stage1.attemptId, "taskType",
    +        TaskKilled(reason = "Killed"), tasks(1), null))
    +
    +    // Ensure killed task metrics are updated
    +    val allStages = 
store.view(classOf[StageDataWrapper]).reverse().asScala.map(_.info)
    +    val failedStages = allStages.filter(_.status == v1.StageStatus.FAILED)
    +    assert(failedStages.size == 1)
    +    assert(failedStages.head.numKilledTasks == 1)
    +    assert(failedStages.head.numCompleteTasks == 1)
    +
    +    val allJobs = 
store.view(classOf[JobDataWrapper]).reverse().asScala.map(_.info)
    +    assert(allJobs.size == 1)
    --- End diff --
    
    Sorry, I couldn't respond on Friday.
    
    So, the above code has been replaced with "conditionalLiveUpdate(job, now, 
removeStage)" in my PR. This means that if the taskEnd event is the last event 
and stage has already completed, the job will be updated instantaneously. If it 
is the last event but stage has not completed yet, then the onStageCompleted 
event will update the job instantaneously.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to