Github user squito commented on a diff in the pull request:
https://github.com/apache/spark/pull/11105#discussion_r86486359
--- Diff: core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala ---
@@ -42,18 +60,45 @@ private[spark] case class AccumulatorMetadata(
* `OUT` should be a type that can be read atomically (e.g., Int, Long),
or thread-safely
* (e.g., synchronized collections) because it will be read from other
threads.
*/
-abstract class AccumulatorV2[IN, OUT] extends Serializable {
+abstract class AccumulatorV2[@specialized(Int, Long, Double) IN, OUT]
extends Serializable {
private[spark] var metadata: AccumulatorMetadata = _
- private[this] var atDriverSide = true
+ private[spark] var atDriverSide = true
+
+ /**
+ * The following values are used for data property [[AccumulatorV2]]s.
+ * Data property [[AccumulatorV2]]s have only-once semantics. These
semantics are implemented
+ * by keeping track of which RDD id, shuffle id, and partition id the
current function is
+ * processing in. If a partition is fully processed the results for that
partition/shuffle/rdd
+ * combination are sent back to the driver. The driver keeps track of
which rdd/shuffle/partitions
+ * already have been applied, and only combines values into value_ if
the rdd/shuffle/partition
+ * has not already been aggregated on the driver program
+ */
+ // For data property accumulators pending and processed updates.
+ // Pending and processed are keyed by (rdd id, shuffle id, partition id)
+ private[spark] lazy val pending =
+ new mutable.HashMap[TaskOutputId, AccumulatorV2[IN, OUT]]()
+ // Completed contains the set of (rdd id, shuffle id, partition id) that
have been
+ // fully processed on the worker side. This is used to determine if the
updates should
+ // be merged on the driver for a particular rdd/shuffle/partition
combination.
+ private[spark] lazy val completed = new mutable.HashSet[TaskOutputId]()
+ // rddProcessed is keyed by rdd id and the value is a bitset containing
all partitions
+ // for the given key which have been merged into the value. This is used
on the driver.
+ @transient private[spark] lazy val rddProcessed = new
mutable.HashMap[Int, mutable.BitSet]()
+ // shuffleProcessed is the same as rddProcessed except keyed by shuffle
id.
+ @transient private[spark] lazy val shuffleProcessed = new
mutable.HashMap[Int, mutable.BitSet]()
--- End diff --
I had a comment here earlier to check my understanding of this -- github is
being really weird with these comments so I'm going to repost:
----
I find the logic around pending, completed and processed really confusing.
I'm still thinking this through -- before making concrete suggestions I want to
check my understanding.
completed should always only reflect the update from one task. It should
only be set on the executor. At first I was going to say it shouldn't be a set,
it should just be one value -- but of course one task can get updates from
multiple rdds. It will differ from the keyset of pending b/c pending can
include updates from partitions that aren't completed (eg. those take and
coalesce cases).
`processed` is only used on the driver. Since it has global knowledge, it
can use this determine whether each update is a repeat or not. (now this is
`rddProcessed` and `shuffleProcessed`.)
I think the comment "If a partition is fully processed the results for that
partition/shuffle/rdd combination are sent back to the driver" is incorrect --
it looks to me like the updates are always sent back to the driver, and the
driver always makes the call on whether or not to merge.
seems like merge (and thus dataPropertyMerge) is only called on the driver.
I'm trying to figure out whether or not it matters. It seems like it would do
something weird if it were called on the executor. maybe we should add an
assert(isAtDriver).
----
Assuming that is correct, I'd suggest some renamings:
`completed` to `completedTaskOutputsForOneTask`
and include a comment that its keyset differs from `pending` b/c some of
the updates in pending may be from incomplete partition processing.
`pending` to `pendingAccumulatorUpdatesFromOneTask`
and have the comment mention that this tracks updates all accumulator
updates from one task, but some of them may be ignored if the output they
represent isn't fully computed, eg. an rdd.take() leads to incomplete partition
processing.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]