Hey, I am not 100% sure but from my understanding accumulators are per partition (so per task as its the same) and are sent back to the driver with the task result and merged. When a task needs to be run n times (multiple rdds depend on this one, some partition loss later in the chain etc) then the accumulator will count n times the values from that task. So in short I don't think you'd win from using an accumulator over what you are doing right now.
You could maybe coalesce your rdd to num-executors without a shuffle and then update the sketches. You should endup with 1 partition per executor thus 1 sketch per executor. You could then increase the number of threads per task if you can use the sketches concurrently. Eugen 2015-06-18 13:36 GMT+02:00 Guillaume Pitel <guillaume.pi...@exensa.com>: > Hi, > > I'm trying to figure out the smartest way to implement a global > count-min-sketch on accumulators. For now, we are doing that with RDDs. It > works well, but with one sketch per partition, merging takes too long. > > As you probably know, a count-min sketch is a big mutable array of array > of ints. To distribute it, all sketches must have the same size. Since it > can be big, and since merging is not free, I would like to minimize the > number of sketches and maximize reuse and conccurent use of the sketches. > Ideally, I would like to just have one sketch per worker. > > I think accumulables might be the right structures for that, but it seems > that they are not shared between executors, or even between tasks. > > > https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/Accumulators.scala > (289) > /** > * This thread-local map holds per-task copies of accumulators; it is > used to collect the set > * of accumulator updates to send back to the driver when tasks complete. > After tasks complete, > * this map is cleared by `Accumulators.clear()` (see Executor.scala). > */ > private val localAccums = new ThreadLocal[Map[Long, Accumulable[_, > _]]]() { > override protected def initialValue() = Map[Long, Accumulable[_, _]]() > } > The localAccums stores an accumulator for each task (it's thread local, so > I assume each task have a unique thread on executors) > > If I understand correctly, each time a task starts, an accumulator is > initialized locally, updated, then sent back to the driver for merging ? > > So I guess, accumulators may not be the way to go, finally. > > Any advice ? > Guillaume > -- > [image: eXenSa] > *Guillaume PITEL, Président* > +33(0)626 222 431 > > eXenSa S.A.S. <http://www.exensa.com/> > 41, rue Périer - 92120 Montrouge - FRANCE > Tel +33(0)184 163 677 / Fax +33(0)972 283 705 >