Github user tgravescs commented on a diff in the pull request: https://github.com/apache/spark/pull/21653#discussion_r204177708 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala --- @@ -723,6 +723,21 @@ private[spark] class TaskSetManager( def handleSuccessfulTask(tid: Long, result: DirectTaskResult[_]): Unit = { val info = taskInfos(tid) val index = info.index + // Check if any other attempt succeeded before this and this attempt has not been handled + if (successful(index) && killedByOtherAttempt.contains(tid)) { + calculatedTasks -= 1 + + val resultSizeAcc = result.accumUpdates.find(a => + a.name == Some(InternalAccumulator.RESULT_SIZE)) + if (resultSizeAcc.isDefined) { + totalResultSize -= resultSizeAcc.get.asInstanceOf[LongAccumulator].value --- End diff -- the downside here is we already incremented and other tasks could have checked and failed before we decrement, but unless someone else has a better idea this is better then it is now.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org