Github user JoshRosen commented on a diff in the pull request:
https://github.com/apache/spark/pull/10835#discussion_r50807020
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
---
@@ -1074,39 +1074,43 @@ class DAGScheduler(
}
}
- /** Merge updates from a task to our local accumulator values */
+ /**
+ * Merge local values from a task into the corresponding accumulators
previously registered
+ * here on the driver.
+ *
+ * Although accumulators themselves are not thread-safe, this method is
called only from one
+ * thread, the one that runs the scheduling loop. This means we only
handle one task
+ * completion event at a time so we don't need to worry about locking
the accumulators.
+ * This still doesn't stop the caller from updating the accumulator
outside the scheduler,
+ * but that's not our problem since there's nothing we can do about that.
+ */
private def updateAccumulators(event: CompletionEvent): Unit = {
val task = event.task
val stage = stageIdToStage(task.stageId)
- if (event.accumUpdates != null) {
- try {
- Accumulators.add(event.accumUpdates)
-
- event.accumUpdates.foreach { case (id, partialValue) =>
- // In this instance, although the reference in
Accumulators.originals is a WeakRef,
- // it's guaranteed to exist since the event.accumUpdates Map
exists
-
- val acc = Accumulators.originals(id).get match {
- case Some(accum) => accum.asInstanceOf[Accumulable[Any, Any]]
- case None => throw new NullPointerException("Non-existent
reference to Accumulator")
- }
-
- // To avoid UI cruft, ignore cases where value wasn't updated
- if (acc.name.isDefined && partialValue != acc.zero) {
- val name = acc.name.get
- val value = s"${acc.value}"
- stage.latestInfo.accumulables(id) =
- new AccumulableInfo(id, name, None, value, acc.isInternal)
- event.taskInfo.accumulables +=
- new AccumulableInfo(id, name, Some(s"$partialValue"), value,
acc.isInternal)
- }
+ try {
+ event.accumUpdates.foreach { ainfo =>
+ assert(ainfo.update.isDefined, "accumulator from task should have
a partial value")
+ val id = ainfo.id
+ val partialValue = ainfo.update.get
+ // Find the corresponding accumulator on the driver and update it
+ val acc: Accumulable[Any, Any] = Accumulators.get(id) match {
--- End diff --
`getOrElse`?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]