Github user cxzl25 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21656#discussion_r200236204
  
    --- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
    @@ -772,6 +772,12 @@ private[spark] class TaskSetManager(
       private[scheduler] def markPartitionCompleted(partitionId: Int): Unit = {
         partitionToIndex.get(partitionId).foreach { index =>
           if (!successful(index)) {
    +        if (speculationEnabled) {
    +          taskAttempts(index).headOption.map { info =>
    +            info.markFinished(TaskState.FINISHED, clock.getTimeMillis())
    +            successfulTaskDurations.insert(info.duration)
    --- End diff --
    
    TaskSetManager#handleSuccessfulTask update successful task durations, and 
write to successfulTaskDurations.
    
    When there are multiple tasksets for this stage, 
markPartitionCompletedInAllTaskSets is
    accumulate the value of tasksSuccessful.
    
    In this case, when checkSpeculatableTasks is called, the value of 
tasksSuccessful matches the condition, but successfulTaskDurations is empty.
    
    
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L723
    ```scala
      def handleSuccessfulTask(tid: Long, result: DirectTaskResult[_]): Unit = {
        val info = taskInfos(tid)
        val index = info.index
        info.markFinished(TaskState.FINISHED, clock.getTimeMillis())
        if (speculationEnabled) {
          successfulTaskDurations.insert(info.duration)
        }
       // ...
       // There may be multiple tasksets for this stage -- we let all of them 
know that the partition
       // was completed.  This may result in some of the tasksets getting 
completed.
        sched.markPartitionCompletedInAllTaskSets(stageId, 
tasks(index).partitionId)
    ```
    
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L987
    ```scala
    override def checkSpeculatableTasks(minTimeToSpeculation: Int): Boolean = {
    //...
      if (tasksSuccessful >= minFinishedForSpeculation && tasksSuccessful > 0) {
          val time = clock.getTimeMillis()
          val medianDuration = successfulTaskDurations.median
    ```



---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to