mridulm commented on code in PR #44321:
URL: https://github.com/apache/spark/pull/44321#discussion_r1441419344
##########
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala:
##########
@@ -846,11 +851,49 @@ private[spark] class TaskSetManager(
// "result.value()" in "TaskResultGetter.enqueueSuccessfulTask" before
reaching here.
// Note: "result.value()" only deserializes the value when it's called at
the first time, so
// here "result.value()" just returns the value and won't block other
threads.
- sched.dagScheduler.taskEnded(tasks(index), Success, result.value(),
result.accumUpdates,
- result.metricPeaks, info)
+
+ emptyTaskInfoAccumulablesAndNotifyDagScheduler(tid, tasks(index), Success,
result.value(),
+ result.accumUpdates, result.metricPeaks, info)
maybeFinishTaskSet()
}
+ /**
+ * A wrapper around [[DAGScheduler.taskEnded()]] that empties out the
accumulables for the
+ * TaskInfo object, corresponding to the completed task, referenced by this
class.
+ *
+ * SPARK-46383: For the completed task, we ship the original TaskInfo to the
DAGScheduler and only
+ * retain a cloned TaskInfo in this class. We then set the accumulables to
Nil for the TaskInfo
+ * object that corresponds to the completed task.
+ * We do this to release references to `TaskInfo.accumulables()` as the
TaskInfo
+ * objects held by this class are long-lived and have a heavy memory
footprint on the driver.
+ *
+ * This is safe as the TaskInfo accumulables are not needed once they are
shipped to the
+ * DAGScheduler where they are aggregated. Additionally, the original
TaskInfo, and not a
+ * clone, must be sent to the DAGScheduler as this TaskInfo object is sent
to the
+ * DAGScheduler on multiple events during the task's lifetime. Users can
install
+ * SparkListeners that compare the TaskInfo objects across these
SparkListener events and
+ * thus the TaskInfo object sent to the DAGScheduler must always reference
the same TaskInfo
+ * object.
+ */
+ private def emptyTaskInfoAccumulablesAndNotifyDagScheduler(
+ taskId: Long,
+ task: Task[_],
+ reason: TaskEndReason,
+ result: Any,
+ accumUpdates: Seq[AccumulatorV2[_, _]],
+ metricPeaks: Array[Long],
+ taskInfo: TaskInfo): Unit = {
+ val index = taskInfo.index
+ if (conf.get(DROP_TASK_INFO_ACCUMULABLES_ON_TASK_COMPLETION)) {
Review Comment:
Pull `conf.get(DROP_TASK_INFO_ACCUMULABLES_ON_TASK_COMPLETION)` as a private
field
##########
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala:
##########
@@ -787,6 +787,9 @@ private[spark] class TaskSetManager(
// SPARK-37300: when the task was already finished state, just ignore it,
// so that there won't cause successful and tasksSuccessful wrong result.
if(info.finished) {
+ // SPARK-46383: Clear out the accumulables for a completed task to
reduce accumulable
+ // lifetime.
+ info.resetAccumulables()
Review Comment:
Only when `conf.get(DROP_TASK_INFO_ACCUMULABLES_ON_TASK_COMPLETION)` ?
##########
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala:
##########
@@ -846,11 +851,49 @@ private[spark] class TaskSetManager(
// "result.value()" in "TaskResultGetter.enqueueSuccessfulTask" before
reaching here.
// Note: "result.value()" only deserializes the value when it's called at
the first time, so
// here "result.value()" just returns the value and won't block other
threads.
- sched.dagScheduler.taskEnded(tasks(index), Success, result.value(),
result.accumUpdates,
- result.metricPeaks, info)
+
+ emptyTaskInfoAccumulablesAndNotifyDagScheduler(tid, tasks(index), Success,
result.value(),
+ result.accumUpdates, result.metricPeaks, info)
maybeFinishTaskSet()
}
+ /**
+ * A wrapper around [[DAGScheduler.taskEnded()]] that empties out the
accumulables for the
+ * TaskInfo object, corresponding to the completed task, referenced by this
class.
+ *
+ * SPARK-46383: For the completed task, we ship the original TaskInfo to the
DAGScheduler and only
+ * retain a cloned TaskInfo in this class. We then set the accumulables to
Nil for the TaskInfo
+ * object that corresponds to the completed task.
+ * We do this to release references to `TaskInfo.accumulables()` as the
TaskInfo
+ * objects held by this class are long-lived and have a heavy memory
footprint on the driver.
+ *
+ * This is safe as the TaskInfo accumulables are not needed once they are
shipped to the
+ * DAGScheduler where they are aggregated. Additionally, the original
TaskInfo, and not a
+ * clone, must be sent to the DAGScheduler as this TaskInfo object is sent
to the
+ * DAGScheduler on multiple events during the task's lifetime. Users can
install
+ * SparkListeners that compare the TaskInfo objects across these
SparkListener events and
+ * thus the TaskInfo object sent to the DAGScheduler must always reference
the same TaskInfo
+ * object.
+ */
+ private def emptyTaskInfoAccumulablesAndNotifyDagScheduler(
+ taskId: Long,
+ task: Task[_],
+ reason: TaskEndReason,
+ result: Any,
+ accumUpdates: Seq[AccumulatorV2[_, _]],
+ metricPeaks: Array[Long],
+ taskInfo: TaskInfo): Unit = {
+ val index = taskInfo.index
Review Comment:
super nit: pull this variable into the `if` block below
##########
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala:
##########
@@ -874,6 +917,9 @@ private[spark] class TaskSetManager(
// SPARK-37300: when the task was already finished state, just ignore it,
// so that there won't cause copiesRunning wrong result.
if (info.finished) {
+ // SPARK-46383: Clear out the accumulables for a completed task to
reduce accumulable
+ // lifetime.
+ info.resetAccumulables()
Review Comment:
Same as above, only when
`conf.get(DROP_TASK_INFO_ACCUMULABLES_ON_TASK_COMPLETION)` ?
##########
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala:
##########
@@ -229,6 +234,77 @@ class TaskSetManagerSuite
super.afterEach()
}
+ test("SPARK-46383: Accumulables of TaskInfo objects held by TaskSetManager
must not be " +
+ "accessed once the task has completed") {
+ val conf = new SparkConf().
+ set(config.DROP_TASK_INFO_ACCUMULABLES_ON_TASK_COMPLETION, true)
+ sc = new SparkContext("local", "test", conf)
+ sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
+ val taskSet = FakeTask.createTaskSet(1)
+ val clock = new ManualClock
+ val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock
= clock)
+ val accumUpdates = taskSet.tasks.head.metrics.internalAccums
+
+ // Offer a host. This will launch the first task.
+ val taskOption = manager.resourceOffer("exec1", "host1", NO_PREF)._1
+ assert(taskOption.isDefined)
+
+ clock.advance(1)
+ // Tell it the first task has finished successfully
+ manager.handleSuccessfulTask(0, createTaskResult(0, accumUpdates))
+ assert(sched.endedTasks(0) === Success)
+
+ val e = intercept[SparkException]{
+ manager.taskInfos.head._2.accumulables
+ }
+ assert(e.getMessage.contains("Accumulables for the TaskInfo have been
cleared"))
+ }
+
+ test("SPARK-46383: TaskInfo accumulables are cleared upon task completion") {
+ val conf = new SparkConf().
+ set(config.DROP_TASK_INFO_ACCUMULABLES_ON_TASK_COMPLETION, true)
+ sc = new SparkContext("local", "test", conf)
+ sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
+ val taskSet = FakeTask.createTaskSet(2)
+ val clock = new ManualClock
+ val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock
= clock)
+ val accumUpdates = taskSet.tasks.head.metrics.internalAccums
+
+ // Offer a host. This will launch the first task.
+ val taskOption = manager.resourceOffer("exec1", "host1", NO_PREF)._1
+ assert(taskOption.isDefined)
+
+ clock.advance(1)
+ // Tell it the first task has finished successfully
+ manager.handleSuccessfulTask(0, createTaskResult(0, accumUpdates))
+ assert(sched.endedTasks(0) === Success)
+
+ // Only one task was launched and it completed successfully, thus the
TaskInfo accumulables
+ // should be empty.
+ assert(!manager.taskInfos.exists(l => !l._2.isAccumulablesEmpty))
Review Comment:
Change from `l` to avoid confusion with `1` (here and elsewhere)
##########
core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala:
##########
@@ -289,6 +290,17 @@ class SparkListenerSuite extends SparkFunSuite with
LocalSparkContext with Match
stageInfo.rddInfos.forall(_.numPartitions == 4) should be {true}
}
+ test("SPARK-46383: Track TaskInfo objects") {
+ val conf = new
SparkConf().set(DROP_TASK_INFO_ACCUMULABLES_ON_TASK_COMPLETION, true)
+ sc = new SparkContext("local", "SparkListenerSuite", conf)
+ val listener = new SaveActiveTaskInfos
+ sc.addSparkListener(listener)
+ val rdd1 = sc.parallelize(1 to 100, 4)
+ sc.runJob(rdd1, (items: Iterator[Int]) => items.size, Seq(0, 1))
+ sc.listenerBus.waitUntilEmpty()
+ listener.taskInfos.size should be { 0 }
Review Comment:
I am not sure I follow this test, what is it trying to do ?
This test will be successful even with
`DROP_TASK_INFO_ACCUMULABLES_ON_TASK_COMPLETION` = `true`, right ? (Since it is
simply checking for instance equality in the fired event ?)
##########
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala:
##########
@@ -846,11 +851,49 @@ private[spark] class TaskSetManager(
// "result.value()" in "TaskResultGetter.enqueueSuccessfulTask" before
reaching here.
// Note: "result.value()" only deserializes the value when it's called at
the first time, so
// here "result.value()" just returns the value and won't block other
threads.
- sched.dagScheduler.taskEnded(tasks(index), Success, result.value(),
result.accumUpdates,
- result.metricPeaks, info)
+
+ emptyTaskInfoAccumulablesAndNotifyDagScheduler(tid, tasks(index), Success,
result.value(),
+ result.accumUpdates, result.metricPeaks, info)
maybeFinishTaskSet()
}
+ /**
+ * A wrapper around [[DAGScheduler.taskEnded()]] that empties out the
accumulables for the
+ * TaskInfo object, corresponding to the completed task, referenced by this
class.
+ *
+ * SPARK-46383: For the completed task, we ship the original TaskInfo to the
DAGScheduler and only
+ * retain a cloned TaskInfo in this class. We then set the accumulables to
Nil for the TaskInfo
+ * object that corresponds to the completed task.
+ * We do this to release references to `TaskInfo.accumulables()` as the
TaskInfo
+ * objects held by this class are long-lived and have a heavy memory
footprint on the driver.
+ *
+ * This is safe as the TaskInfo accumulables are not needed once they are
shipped to the
+ * DAGScheduler where they are aggregated. Additionally, the original
TaskInfo, and not a
+ * clone, must be sent to the DAGScheduler as this TaskInfo object is sent
to the
+ * DAGScheduler on multiple events during the task's lifetime. Users can
install
+ * SparkListeners that compare the TaskInfo objects across these
SparkListener events and
+ * thus the TaskInfo object sent to the DAGScheduler must always reference
the same TaskInfo
+ * object.
+ */
+ private def emptyTaskInfoAccumulablesAndNotifyDagScheduler(
+ taskId: Long,
+ task: Task[_],
+ reason: TaskEndReason,
+ result: Any,
+ accumUpdates: Seq[AccumulatorV2[_, _]],
+ metricPeaks: Array[Long],
+ taskInfo: TaskInfo): Unit = {
Review Comment:
As we are passing `taskId` already - we can drop `taskInfo` from here ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]