Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11105#discussion_r56422706
  
    --- Diff: core/src/main/scala/org/apache/spark/Accumulable.scala ---
    @@ -114,23 +163,40 @@ class Accumulable[R, T] private (
        * same mutable instance around.
        */
       private[spark] def copy(): Accumulable[R, T] = {
    -    new Accumulable[R, T](id, initialValue, param, name, internal, 
countFailedValues)
    +    new Accumulable[R, T](id, initialValue, param, name, internal, 
countFailedValues, consistent)
       }
     
       /**
        * Add more data to this accumulator / accumulable
        * @param term the data to add
        */
    -  def += (term: T) { value_ = param.addAccumulator(value_, term) }
    +  def += (term: T) { add(term) }
     
       /**
        * Add more data to this accumulator / accumulable
        * @param term the data to add
        */
    -  def add(term: T) { value_ = param.addAccumulator(value_, term) }
    +  def add(term: T) {
    +    value_ = param.addAccumulator(value_, term)
    +    if (consistent) {
    +      val updateInfo = TaskContext.get().getRDDPartitionInfo()
    +      val base = pending.getOrElse(updateInfo, zero)
    +      pending(updateInfo) = param.addAccumulator(base, term)
    +    }
    +  }
     
       /**
    -   * Merge two accumulable objects together
    +   * Mark a specific rdd/shuffle/partition as completely processed. This 
is a noop for
    +   * non-consistent accumuables.
    +   */
    +  private[spark] def markFullyProcessed(rddId: Int, shuffleId: Int, 
partitionId: Int): Unit = {
    +    if (consistent) {
    +      completed += ((rddId, shuffleId, partitionId))
    --- End diff --
    
    I'm trying to understand the role of shuffleId.  Is the point that if you 
have a `combineByKey`, you need to separately count the accumulator updates on 
the map-side and the reduce-side?  If so this also probably needs an 
explanation in a comment somewhere.  We probably need a big block comment 
explaining how consistent accumulators work at a high-level, pointing out a few 
of the more important corner cases (eg., cache-eviction, take(), coalesce(), 
combineByKey()), but pointing at `ConsistentAccumulatorSuite` for more details.
    
    Also I'd rename `shuffleId` to `shuffleWriteId` or something to be more 
clear, since it seems like you could actually choose either one (but of course 
you need to be consistent), so it would help the reader to have the name be 
more explicit.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to