Github user holdenk commented on a diff in the pull request:
    --- Diff: core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala ---
    @@ -136,15 +179,76 @@ abstract class AccumulatorV2[IN, OUT] extends 
Serializable {
       def reset(): Unit
    +   * Takes the inputs and accumulates. e.g. it can be a simple `+=` for 
counter accumulator.
    +   * Developers should extend addImpl to customize the adding 
    +   */
    +  final def add(v: IN): Unit = {
    +    if (metadata != null && metadata.dataProperty) {
    +      dataPropertyAdd(v)
    +    } else {
    +      addImpl(v)
    +    }
    +  }
    +  private def dataPropertyAdd(v: IN): Unit = {
    +    // Add first for localValue & AccumulatorInfo
    --- End diff --
    If we want the user to be able to access the current accumulated value from 
their process worker side then we need to perform a "normal" add as well as the 
data property add. Also if we don't have a merge step (e.g. only one 
accumulator) this might make a difference (although I _think_ this won't be the 
case anymore with the refactor that happened I haven't tested it).

If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at or file a JIRA ticket
with INFRA.

To unsubscribe, e-mail:
For additional commands, e-mail:

Reply via email to