Github user ankuriitg commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22209#discussion_r214372050
  
    --- Diff: 
core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala ---
    @@ -1190,6 +1190,61 @@ class AppStatusListenerSuite extends SparkFunSuite 
with BeforeAndAfter {
         assert(appStore.asOption(appStore.lastStageAttempt(3)) === None)
       }
     
    +  test("SPARK-24415: update metrics for tasks that finish late") {
    +    val listener = new AppStatusListener(store, conf, true)
    +
    +    val stage1 = new StageInfo(1, 0, "stage1", 4, Nil, Nil, "details1")
    +    val stage2 = new StageInfo(2, 0, "stage2", 4, Nil, Nil, "details2")
    +
    +    // Start job
    +    listener.onJobStart(SparkListenerJobStart(1, time, Seq(stage1, 
stage2), null))
    +
    +    // Start 2 stages
    +    listener.onStageSubmitted(SparkListenerStageSubmitted(stage1, new 
Properties()))
    +    listener.onStageSubmitted(SparkListenerStageSubmitted(stage2, new 
Properties()))
    +
    +    // Start 2 Tasks
    +    val tasks = createTasks(2, Array("1"))
    +    tasks.foreach { task =>
    +      listener.onTaskStart(SparkListenerTaskStart(stage1.stageId, 
stage1.attemptNumber, task))
    +    }
    +
    +    // Task 1 Finished
    +    time += 1
    +    tasks(0).markFinished(TaskState.FINISHED, time)
    +    listener.onTaskEnd(
    +      SparkListenerTaskEnd(stage1.stageId, stage1.attemptId, "taskType", 
Success, tasks(0), null))
    +
    +    // Stage 1 Completed
    +    stage1.failureReason = Some("Failed")
    +    listener.onStageCompleted(SparkListenerStageCompleted(stage1))
    +
    +    // Stop job 1
    +    time += 1
    +    listener.onJobEnd(SparkListenerJobEnd(1, time, JobSucceeded))
    +
    +    // Task 2 Killed
    +    time += 1
    +    tasks(1).markFinished(TaskState.FINISHED, time)
    +    listener.onTaskEnd(
    +      SparkListenerTaskEnd(stage1.stageId, stage1.attemptId, "taskType",
    +        TaskKilled(reason = "Killed"), tasks(1), null))
    +
    +    // Ensure killed task metrics are updated
    +    val allStages = 
store.view(classOf[StageDataWrapper]).reverse().asScala.map(_.info)
    +    val failedStages = allStages.filter(_.status == v1.StageStatus.FAILED)
    +    assert(failedStages.size == 1)
    +    assert(failedStages.head.numKilledTasks == 1)
    +    assert(failedStages.head.numCompleteTasks == 1)
    +
    +    val allJobs = 
store.view(classOf[JobDataWrapper]).reverse().asScala.map(_.info)
    +    assert(allJobs.size == 1)
    --- End diff --
    
    In this PR, I am only trying to fix any issues which are caused by 
out-of-order events leaving the ones caused by dropped events
    
    Now given that information, job will always be updated whether task 
completion event is received last, or the stage completion event or the job 
completion event (because of the weirdness that you mentioned above). Please 
let me know if that is not correct.
    
    I will create a separate JIRA which handles dropped events.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to