Github user squito commented on a diff in the pull request:
    --- Diff: core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala ---
    @@ -42,18 +60,43 @@ private[spark] case class AccumulatorMetadata(
      * `OUT` should be a type that can be read atomically (e.g., Int, Long), 
or thread-safely
      * (e.g., synchronized collections) because it will be read from other 
    -abstract class AccumulatorV2[IN, OUT] extends Serializable {
    +abstract class AccumulatorV2[@specialized(Int, Long, Double) IN, OUT] 
extends Serializable {
       private[spark] var metadata: AccumulatorMetadata = _
    -  private[this] var atDriverSide = true
    +  private[spark] var atDriverSide = true
    +  /**
    +   * The following values are used for data property [[AccumulatorV2]]s.
    +   * Data property [[AccumulatorV2]]s have only-once semantics. These 
semantics are implemented
    +   * by keeping track of which RDD id, shuffle id, and partition id the 
current function is
    +   * processing in. If a partition is fully processed the results for that 
    +   * combination are sent back to the driver. The driver keeps track of 
which rdd/shuffle/partitions
    +   * already have been applied, and only combines values into value_ if 
the rdd/shuffle/partition
    +   * has not already been aggregated on the driver program
    +   */
    +  // For data property accumulators pending and processed updates.
    +  // Pending and processed are keyed by (rdd id, shuffle id, partition id)
    +  private[spark] lazy val pending =
    +    new mutable.HashMap[(Int, Int, Int), AccumulatorV2[IN, OUT]]()
    +  // Completed contains the set of (rdd id, shuffle id, partition id) that 
have been
    +  // fully processed on the worker side. This is used to determine if the 
updates should
    +  // be sent back to the driver for a particular rdd/shuffle/partition 
    +  private[spark] lazy val completed = new mutable.HashSet[(Int, Int, 
    +  // Processed is keyed by (rdd id, shuffle id) and the value is a bitset 
containing all partitions
    +  // for the given key which have been merged into the value. This is used 
on the driver.
    +  @transient private[spark] lazy val processed = new mutable.HashMap[(Int, 
Int), mutable.BitSet]()
    --- End diff --
    I find the logic around `pending`, `completed` and `processed` really 
confusing.  I'm still thinking this through -- before making concrete 
suggestions I want to check my understanding.
    `completed` should always only reflect the update from one task.  It should 
only be set on the executor.  At first I was going to say it shouldn't be a 
set, it should just be one value -- but of course one task can get updates from 
multiple rdds.  It will differ from the keyset of `pending` b/c pending can 
include updates from partitions that aren't completed (eg. those `take` and 
`coalesce` cases).
    `processed` is only used on the driver.  Since it has global knowledge, it 
can use this determine whether each update is a repeat or not.
    I think the comment whether updates should be sent back to the driver is 
incorrect -- it looks to me like the updates are always sent back to the 
driver, and the driver makes all of its determinations.
    seems like `merge` (and thus `dataPropertyMerge`) is only called on the 
driver.  I'm trying to figure out whether or not it matters.  It seems like it 
would do something weird if it were called on the executor.  maybe we should 
add an `assert(isAtDriver)`.

If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at or file a JIRA ticket
with INFRA.

To unsubscribe, e-mail:
For additional commands, e-mail:

Reply via email to