beliefer commented on code in PR #44321:
URL: https://github.com/apache/spark/pull/44321#discussion_r1428673442
##########
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala:
##########
@@ -846,11 +850,49 @@ private[spark] class TaskSetManager(
// "result.value()" in "TaskResultGetter.enqueueSuccessfulTask" before
reaching here.
// Note: "result.value()" only deserializes the value when it's called at
the first time, so
// here "result.value()" just returns the value and won't block other
threads.
- sched.dagScheduler.taskEnded(tasks(index), Success, result.value(),
result.accumUpdates,
- result.metricPeaks, info)
+
+ emptyTaskInfoAccumulablesAndNotifyDagScheduler(tid, tasks(index), Success,
result.value(),
+ result.accumUpdates, result.metricPeaks, info)
maybeFinishTaskSet()
}
+ /**
+ * A wrapper around [[DAGScheduler.taskEnded()]] that empties out the
accumulables for the
+ * TaskInfo object, corresponding to the completed task, referenced by this
class.
+ *
+ * SPARK-46383: For the completed task, we ship the original TaskInfo to the
DAGScheduler and only
+ * retain a cloned TaskInfo in this class. We then set the accumulables to
Nil for the TaskInfo
+ * object that corresponds to the completed task.
+ * We do this to release references to `TaskInfo.accumulables()` as the
TaskInfo
+ * objects held by this class are long-lived and have a heavy memory
footprint on the driver.
+ *
+ * This is safe as the TaskInfo accumulables are not needed once they are
shipped to the
+ * DAGScheduler where they are aggregated. Additionally, the original
TaskInfo, and not a
+ * clone, must be sent to the DAGScheduler as this TaskInfo object is sent
to the
+ * DAGScheduler on multiple events during the task's lifetime. Users can
install
+ * SparkListeners that compare the TaskInfo objects across these
SparkListener events and
+ * thus the TaskInfo object sent to the DAGScheduler must always reference
the same TaskInfo
+ * object.
+ */
+ private def emptyTaskInfoAccumulablesAndNotifyDagScheduler(
+ taskId: Long,
+ task: Task[_],
+ reason: TaskEndReason,
+ result: Any,
+ accumUpdates: Seq[AccumulatorV2[_, _]],
+ metricPeaks: Array[Long],
+ taskInfo: TaskInfo): Unit = {
+ val index = taskInfo.index
+ if (conf.get(DROP_TASK_INFO_ACCUMULABLES_ON_TASK_COMPLETION)) {
+ val clonedTaskInfo = taskInfo.cloneWithEmptyAccumulables()
+ // Update this task's taskInfo while preserving its position in the list
+ taskAttempts(index) =
+ taskAttempts(index).map { i => if (i eq taskInfo) clonedTaskInfo else
i }
+ taskInfos(taskId) = clonedTaskInfo
+ }
+ sched.dagScheduler.taskEnded(task, reason, result, accumUpdates,
metricPeaks, taskInfo)
Review Comment:
It seems `DAGScheduler` need the `taskInfo.accumulables`, why send
`CompletionEvent` after clone TaskInfo with empty accumulables?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]