Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/11105#discussion_r83287564 --- Diff: core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala --- @@ -136,15 +179,76 @@ abstract class AccumulatorV2[IN, OUT] extends Serializable { def reset(): Unit /** + * Takes the inputs and accumulates. e.g. it can be a simple `+=` for counter accumulator. + * Developers should extend addImpl to customize the adding functionality. + */ + final def add(v: IN): Unit = { + if (metadata != null && metadata.dataProperty) { + dataPropertyAdd(v) + } else { + addImpl(v) + } + } + + private def dataPropertyAdd(v: IN): Unit = { + // Add first for localValue & AccumulatorInfo + addImpl(v) + if (metadata != null && metadata.dataProperty) { + val updateInfo = TaskContext.get().getRDDPartitionInfo() + val base = pending.getOrElse(updateInfo, copyAndReset()) + base.atDriverSide = false + base.addImpl(v) + pending(updateInfo) = base + } + } + + /** + * Mark a specific rdd/shuffle/partition as completely processed. This is a noop for + * non-data property accumuables. + */ + private[spark] def markFullyProcessed(rddId: Int, shuffleWriteId: Int, partitionId: Int): Unit = { + if (metadata.dataProperty) { + completed += ((rddId, shuffleWriteId, partitionId)) + } + } + + /** + * Takes the inputs and accumulates. e.g. it can be a simple `+=` for counter accumulator. + * Developers should extend addImpl to customize the adding functionality. * Takes the inputs and accumulates. */ - def add(v: IN): Unit + protected[spark] def addImpl(v: IN) /** * Merges another same-type accumulator into this one and update its state, i.e. this should be - * merge-in-place. + * merge-in-place. Developers should extend mergeImpl to customize the merge functionality. */ - def merge(other: AccumulatorV2[IN, OUT]): Unit + final private[spark] lazy val merge: (AccumulatorV2[IN, OUT] => Unit) = { + // Handle data property accumulators --- End diff -- this comment is kinda pointless -- pretty obvious that is what the `if` is doing. What is really needed is an explanation of *why* -- maybe that belongs as a doc comment on `dataPropertyMerge`.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org