Github user squito commented on a diff in the pull request:
https://github.com/apache/spark/pull/11105#discussion_r55434378
--- Diff: core/src/main/scala/org/apache/spark/Accumulable.scala ---
@@ -53,42 +54,79 @@ import org.apache.spark.util.Utils
* for system and time metrics like serialization
time or bytes spilled,
* and false for things with absolute values like
number of input rows.
* This should be used for internal metrics only.
- * @tparam R the full accumulated data (result type)
+ * @param consistent if this [[Accumulable]] is consistent. Consistent
[[Accumulable]]s will only
+ * have values added once for each RDD/Partition
execution combination. This
+ * prevents double counting on reevaluation. Partial
evaluation of a partition
+ * will not increment a consistent [[Accumulable]].
Consistent [[Accumulable]]s
+ * are currently experimental and the behaviour may
change in future versions.
+ * Consistent [[Accumulable]]s can only be added to
inside is
+ * [[MapPartitionsRDD]]s and are designed for counting
"data properties".
+ * @tparam R the full accumulated data
* @tparam T partial data that can be added in
*/
-class Accumulable[R, T] private (
+class Accumulable[R, T] private[spark] (
val id: Long,
// SI-8813: This must explicitly be a private val, or else scala 2.11
doesn't compile
@transient private val initialValue: R,
param: AccumulableParam[R, T],
val name: Option[String],
internal: Boolean,
- private[spark] val countFailedValues: Boolean)
+ private[spark] val countFailedValues: Boolean,
+ private[spark] val consistent: Boolean)
extends Serializable {
private[spark] def this(
- initialValue: R,
+ @transient initialValue: R,
+ param: AccumulableParam[R, T],
+ internal: Boolean,
+ countFailedValues: Boolean,
+ consistent: Boolean) = {
+ this(Accumulators.newId(), initialValue, param, None, internal,
countFailedValues, consistent)
+ }
+
+ def this(
+ @transient initialValue: R,
param: AccumulableParam[R, T],
name: Option[String],
internal: Boolean,
countFailedValues: Boolean) = {
- this(Accumulators.newId(), initialValue, param, name, internal,
countFailedValues)
+ this(Accumulators.newId(), initialValue, param, name, internal,
countFailedValues,
+ false /* consistent */)
}
- private[spark] def this(
- initialValue: R,
+ def this(
+ @transient initialValue: R,
param: AccumulableParam[R, T],
name: Option[String],
- internal: Boolean) = {
- this(initialValue, param, name, internal, false /* countFailedValues
*/)
+ internal: Boolean,
+ countFailedValues: Boolean,
+ consistent: Boolean) = {
+ this(Accumulators.newId(), initialValue, param, name, internal,
countFailedValues, consistent)
}
- def this(initialValue: R, param: AccumulableParam[R, T], name:
Option[String]) =
+ def this(
+ @transient initialValue: R,
+ param: AccumulableParam[R, T],
+ name: Option[String],
+ internal: Boolean) =
+ this(initialValue, param, name, internal, false /* countFailed */)
+
+ def this(
+ @transient initialValue: R,
+ param: AccumulableParam[R, T],
+ name: Option[String]) =
this(initialValue, param, name, false /* internal */)
- def this(initialValue: R, param: AccumulableParam[R, T]) =
this(initialValue, param, None)
+ def this(
+ @transient initialValue: R,
+ param: AccumulableParam[R, T]) =
+ this(initialValue, param, None)
@volatile @transient private var value_ : R = initialValue // Current
value on driver
+ // For consistent accumulators pending and processed updates
+ @volatile @transient private[spark] var pending = new
mutable.HashMap[(Int, Int), R]()
+ @volatile @transient private var processed = new mutable.HashMap[Int,
mutable.BitSet]()
--- End diff --
think these can be `@transient lazy val`s. Also I'd like names or a
comment which explains the keys and values
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]