Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/11105#discussion_r86486618 --- Diff: core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala --- @@ -136,15 +181,92 @@ abstract class AccumulatorV2[IN, OUT] extends Serializable { def reset(): Unit /** + * Takes the inputs and accumulates. e.g. it can be a simple `+=` for counter accumulator. + * Developers should extend addImpl to customize the adding functionality. + */ + final def add(v: IN): Unit = { + if (metadata != null && metadata.dataProperty) { + dataPropertyAdd(v) + } else { + addImpl(v) + } + } + + private def dataPropertyAdd(v: IN): Unit = { + // To allow the user to be able to access the current accumulated value from their process + // worker side then we need to perform a "normal" add as well as the data property add. + addImpl(v) + // Add to the pending updates for data property + val updateInfo = TaskContext.get().getRDDPartitionInfo() + val base = pending.getOrElse(updateInfo, copyAndReset()) + // Since we may have constructed a new accumulator, set atDriverSide to false as the default + // new accumulators will have atDriverSide equal to true. + base.atDriverSide = false + base.addImpl(v) + pending(updateInfo) = base + } + + /** + * Mark a specific rdd/shuffle/partition as completely processed. This is a noop for + * non-data property accumuables. --- End diff -- Maybe add something like "see [[TaskOutputId]] for an explanation of why this is important for data property accumulators"? I worry the purpose of this may seem a little mysterious on its own ... figure we should at least point to the more complete description.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org