Rather than using accumulator directly, what you can do is something like
this to lazily create an accumulator and use it (will get lazily recreated
if driver restarts from checkpoint)


dstream.transform { rdd =>
    val accum = SingletonObject.getOrCreateAccumulator()   // single object
method to create an accumulator or get an already created one.
    rdd.map { x =>  /// use accum  }
}


On Wed, Jul 29, 2015 at 1:15 PM, Shushant Arora <shushantaror...@gmail.com>
wrote:

> Hi
>
> I am using spark streaming 1.3 and using checkpointing.
> But job is failing to recover from checkpoint on restart.
>
> For broadcast variable it says :
> 1.WARN TaskSetManager: Lost task 15.0 in stage 7.0 (TID 1269, hostIP):
> java.lang.ClassCastException: [B cannot be cast to
> pkg.broadcastvariableclassname
> at point where i call bcvariable.value() in map function.
>
>  at  mapfunction......
> at
> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$fn$3$1.apply(JavaDStreamLike.scala:184)
>         at
> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$fn$3$1.apply(JavaDStreamLike.scala:184)
>         at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634)
>         at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634)
>         at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>         at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>         at org.apache.spark.scheduler.Task.run(Task.scala:64)
>         at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>         at java.lang.Thread.run(Thread.java:745)
>
> 2.For accumulator variable it says :
> 15/07/29 19:23:12 ERROR DAGScheduler: Failed to update accumulators for
> ResultTask(1, 16)
> java.util.NoSuchElementException: key not found: 2
>         at scala.collection.MapLike$class.default(MapLike.scala:228)
>         at scala.collection.AbstractMap.default(Map.scala:58)
>         at scala.collection.mutable.HashMap.apply(HashMap.scala:64)
>         at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:894)
>         at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:893)
>         at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>         at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>         at
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
>         at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
>         at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
>         at
> org.apache.spark.scheduler.DAGScheduler.updateAccumulators(DAGScheduler.scala:893)
>
> its descibed in
> https://issues.apache.org/jira/browse/SPARK-5206
>
> I can afford to reset the accumulator to 0 on stream restart . Is it
> possible to have it working ?
>
> Thanks
>
>
>
>
>
>

Reply via email to