Hey,

I am not 100% sure but from my understanding accumulators are per partition
(so per task as its the same) and are sent back to the driver with the task
result and merged. When a task needs to be run n times (multiple rdds
depend on this one, some partition loss later in the chain etc) then the
accumulator will count n times the values from that task.
So in short I don't think you'd win from using an accumulator over what you
are doing right now.

You could maybe coalesce your rdd to num-executors without a shuffle and
then update the sketches. You should endup with 1 partition per executor
thus 1 sketch per executor. You could then increase the number of threads
per task if you can use the sketches concurrently.

Eugen

2015-06-18 13:36 GMT+02:00 Guillaume Pitel <guillaume.pi...@exensa.com>:

>  Hi,
>
> I'm trying to figure out the smartest way to implement a global
> count-min-sketch on accumulators. For now, we are doing that with RDDs. It
> works well, but with one sketch per partition, merging takes too long.
>
> As you probably know, a count-min sketch is a big mutable array of array
> of ints. To distribute it, all sketches must have the same size. Since it
> can be big, and since merging is not free, I would like to minimize the
> number of sketches and maximize reuse and conccurent use of the sketches.
> Ideally, I would like to just have one sketch per worker.
>
> I think accumulables might be the right structures for that, but it seems
> that they are not shared between executors, or even between tasks.
>
>
> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/Accumulators.scala
> (289)
>   /**
>  * This thread-local map holds per-task copies of accumulators; it is
> used to collect the set
>  * of accumulator updates to send back to the driver when tasks complete.
> After tasks complete,
>  * this map is cleared by `Accumulators.clear()` (see Executor.scala).
>  */
>  private val localAccums = new ThreadLocal[Map[Long, Accumulable[_,
> _]]]() {
>  override protected def initialValue() = Map[Long, Accumulable[_, _]]()
>  }
> The localAccums stores an accumulator for each task (it's thread local, so
> I assume each task have a unique thread on executors)
>
> If I understand correctly, each time a task starts, an accumulator is
> initialized locally, updated, then sent back to the driver for merging ?
>
> So I guess, accumulators may not be the way to go, finally.
>
> Any advice ?
> Guillaume
> --
>    [image: eXenSa]
>  *Guillaume PITEL, Président*
> +33(0)626 222 431
>
> eXenSa S.A.S. <http://www.exensa.com/>
>  41, rue Périer - 92120 Montrouge - FRANCE
> Tel +33(0)184 163 677 / Fax +33(0)972 283 705
>

Reply via email to