I can reply from an user’s perspective – I defer to semantic guarantees to
someone with more experience.
I’ve successfully implemented the following using a custom Accumulable class:
* Created a MapAccumulator with dynamic keys (they are driven by the data
coming in), as opposed to creating many discrete accumulators
* The merge operation is add the values on key conflict
* I’m adding K->Vs to this accumulator in a variety of places (maps,
flatmaps, transforms and updateStateBy key)
* In a foreachRdd at the end of the transformations I’m reading the
accumulator and writing the counters to OpenTSDB
* after this I’m resetting it to the “zero” value (e.g. Empty map)
Everything works as expected in terms of functionality - with 2 caveats:
* On task/job failure you might get duplicate values for the tasks that are
retried in the active job since adding to an Accumulator in a transformation is
a side effect
* I’m partially working around this by also referring to the RDD time
and overwriting the values in OpenTSDB (idempotent operation)
* If you have stateful transformations and you use checkpointing, the
accumulator code becomes really intrusive in your codebase
* You will need to have a global singleton in your driver and
“getInstance” in a foreachRdd or transform, to force code execution on the
driver
* This is because on restoring from checkpoint your accumulators will be
NULL as the checkpoint recovery makes no attempt to initialize them (See
SPARK-5206<https://issues.apache.org/jira/browse/SPARK-5206?jql=project%20%3D%20SPARK%20AND%20text%20~%20%22accumulator%20null%22>)
Hope this helps,
-adrian
From: "Sela, Amit"
Date: Monday, October 26, 2015 at 11:13 AM
To: "user@spark.apache.org<mailto:user@spark.apache.org>"
Subject: Accumulators internals and reliability
It seems like there is not much literature about Spark's Accumulators so I
thought I'd ask here:
Do Accumulators reside in a Task ? Are they being serialized with the task ?
Sent back on task completion as part of the ResultTask ?
Are they reliable ? If so, when ? Can I relay on accumulators value only after
the task was successfully complete (meaning in the driver) ? Or also during the
task execution as well (what about speculative execution) ?
What are the limitations on the number (or size) of Accumulators ?
Thanks,
Amit