Github user mridulm commented on a diff in the pull request:
https://github.com/apache/spark/pull/21653#discussion_r204199580
--- Diff:
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -723,6 +723,21 @@ private[spark] class TaskSetManager(
def handleSuccessfulTask(tid: Long, result: DirectTaskResult[_]): Unit =
{
val info = taskInfos(tid)
val index = info.index
+ // Check if any other attempt succeeded before this and this attempt
has not been handled
+ if (successful(index) && killedByOtherAttempt.contains(tid)) {
+ calculatedTasks -= 1
+
+ val resultSizeAcc = result.accumUpdates.find(a =>
+ a.name == Some(InternalAccumulator.RESULT_SIZE))
+ if (resultSizeAcc.isDefined) {
+ totalResultSize -=
resultSizeAcc.get.asInstanceOf[LongAccumulator].value
--- End diff --
I agree, I dont see a better option.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]