Hi all,

I wrote an AccumulatorParam but in my job it does not seem to be adding the
values. When I tried with an int accumulator in my job the value was added
to.

object MapAccumulatorParam extends AccumulatorParam[Map[Long, Int]]{
>   def zero(initialValue: Map[Long, Int] = Map.empty): Map[Long, Int] = {
>     Map.empty
>   }
>   override def addInPlace(m1: Map[Long, Int], m2: Map[Long, Int]):
> Map[Long, Int] = {
>     val res = m1.map {
>       case (k, v) => {
>         k -> (v + m2.getOrElse(k, 0))
>       }
>     } ++ m2
>     println(res)
>     res
>   }
>   override def addAccumulator(t1: Map[Long, Int], t2: Map[Long, Int]):
> Map[Long, Int] = {
>     val res = t1.map {
>       case (k, v) => {
>         k -> (v + t2.getOrElse(k, 0))
>       }
>     } ++ t2
>     println(res)
>     res
>   }
> }


To use it I am doing something like this:

val acc1 = sparkEnv.sc.accumulator(Map.empty[Long,
> Int])(MapAccumulatorParam)
> val acc2 = sparkEnv.sc.accumulator(Map.empty[Long,
> Int])(MapAccumulatorParam)
> new SparkJob(sparkEnv, parsedArgs, unpricedCallsAcc, acc1, acc2).runJob()
> match { ...'


Where sparkEnv is a wrapper for both SQLContext and SparkContext. Is there
any reason my values might not be getting added together? Did I initialize
the accumulators in the wrong place (it is where my spark contexts get
created) When I step through the function I see that I get back a Map(1 ->
1) when I do

acc1.add(Map(1 -> 1)


but it never starts the add with a nonempty Map. Any ideas? Thanks in
advance.

Reply via email to