Github user squito commented on a diff in the pull request:
https://github.com/apache/spark/pull/11105#discussion_r86486890
--- Diff: core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala ---
@@ -136,15 +181,92 @@ abstract class AccumulatorV2[IN, OUT] extends
Serializable {
def reset(): Unit
/**
+ * Takes the inputs and accumulates. e.g. it can be a simple `+=` for
counter accumulator.
+ * Developers should extend addImpl to customize the adding
functionality.
+ */
+ final def add(v: IN): Unit = {
+ if (metadata != null && metadata.dataProperty) {
+ dataPropertyAdd(v)
+ } else {
+ addImpl(v)
+ }
+ }
+
+ private def dataPropertyAdd(v: IN): Unit = {
+ // To allow the user to be able to access the current accumulated
value from their process
+ // worker side then we need to perform a "normal" add as well as the
data property add.
+ addImpl(v)
+ // Add to the pending updates for data property
+ val updateInfo = TaskContext.get().getRDDPartitionInfo()
+ val base = pending.getOrElse(updateInfo, copyAndReset())
+ // Since we may have constructed a new accumulator, set atDriverSide
to false as the default
+ // new accumulators will have atDriverSide equal to true.
+ base.atDriverSide = false
+ base.addImpl(v)
+ pending(updateInfo) = base
+ }
+
+ /**
+ * Mark a specific rdd/shuffle/partition as completely processed. This
is a noop for
+ * non-data property accumuables.
+ */
+ private[spark] def markFullyProcessed(taskOutputId: TaskOutputId): Unit
= {
+ if (metadata.dataProperty) {
+ completed += taskOutputId
+ }
+ }
+
+ /**
+ * Takes the inputs and accumulates. e.g. it can be a simple `+=` for
counter accumulator.
+ * Developers should extend addImpl to customize the adding
functionality.
* Takes the inputs and accumulates.
*/
- def add(v: IN): Unit
+ protected[spark] def addImpl(v: IN)
+
+ /**
+ * Merges another same-type accumulator into this one and update its
state, i.e. this should be
+ * merge-in-place. Developers should extend mergeImpl to customize the
merge functionality.
+ */
+ final private[spark] lazy val merge: (AccumulatorV2[IN, OUT] => Unit) = {
+ assert(isAtDriverSide)
+ // Handle data property accumulators
+ if (metadata != null && metadata.dataProperty) {
+ dataPropertyMerge _
+ } else {
+ mergeImpl _
+ }
+ }
+
+ final private[spark] def dataPropertyMerge(other: AccumulatorV2[IN,
OUT]) = {
+ // Apply all foreach partitions regardless - they can only be fully
evaluated
+ val unprocessed = other.pending.filter{
+ case (ForeachOutputId(), v) => mergeImpl(v); false
--- End diff --
the idea is that if you use an accumulator in a foreach, you get the same
behavior whether you use a data-property accumulator or not, right? that makes
sense. before this last change, would data-property accumulators just get
totally ignored in a foreach?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]