Github user tgravescs commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21653#discussion_r204177708
  
    --- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
    @@ -723,6 +723,21 @@ private[spark] class TaskSetManager(
       def handleSuccessfulTask(tid: Long, result: DirectTaskResult[_]): Unit = 
{
         val info = taskInfos(tid)
         val index = info.index
    +    // Check if any other attempt succeeded before this and this attempt 
has not been handled
    +    if (successful(index) && killedByOtherAttempt.contains(tid)) {
    +      calculatedTasks -= 1
    +
    +      val resultSizeAcc = result.accumUpdates.find(a =>
    +        a.name == Some(InternalAccumulator.RESULT_SIZE))
    +      if (resultSizeAcc.isDefined) {
    +        totalResultSize -= 
resultSizeAcc.get.asInstanceOf[LongAccumulator].value
    --- End diff --
    
    the downside here is we already incremented and other tasks could have 
checked and failed before we decrement, but unless someone else has a better 
idea this is better then it is now. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to