Github user cloud-fan commented on a diff in the pull request:
https://github.com/apache/spark/pull/11105#discussion_r61837690
--- Diff: core/src/main/scala/org/apache/spark/AccumulatorV2.scala ---
@@ -21,33 +21,77 @@ import java.{lang => jl}
import java.io.ObjectInputStream
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicLong
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.mutable
import org.apache.spark.scheduler.AccumulableInfo
import org.apache.spark.util.Utils
-
+/**
+ * Metadata for the Accumulator
+ *
+ *
+ * @param countFailedValues whether to accumulate values from failed
tasks. This is set to true
+ * for system and time metrics like serialization
time or bytes spilled,
+ * and false for things with absolute values like
number of input rows.
+ * This should be used for internal metrics only.
+
+ * @param dataProperty Data property accumulators will only have values
added once for each
+ * RDD/Partition/Shuffle combination. This prevents
double counting on
+ * reevaluation. Partial evaluation of a partition
will not increment a data
+ * property accumulator. Data property accumulators
are currently experimental
+ * and the behaviour may change in future versions.
+ *
+ */
private[spark] case class AccumulatorMetadata(
id: Long,
name: Option[String],
- countFailedValues: Boolean) extends Serializable
+ countFailedValues: Boolean,
+ dataProperty: Boolean) extends Serializable
/**
* The base class for accumulators, that can accumulate inputs of type
`IN`, and produce output of
* type `OUT`.
*/
-abstract class AccumulatorV2[IN, OUT] extends Serializable {
+abstract class AccumulatorV2[@specialized(Int, Long, Double) IN, OUT]
extends Serializable {
private[spark] var metadata: AccumulatorMetadata = _
- private[this] var atDriverSide = true
+ private[spark] var atDriverSide = true
+
+ /**
+ * The following values are used for data property [[AccumulatorV2]]s.
+ * Data property [[AccumulatorV2]]s have only-once semantics. These
semantics are implemented
+ * by keeping track of which RDD id, shuffle id, and partition id the
current function is
--- End diff --
If we can know we are re-computing tasks at executor side, then we can
ignore the accumulator values at executor side, and don't need to track the RDD
id, shuffle id, and partition id and driver side right?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]