Hi

I am using spark streaming 1.3 and using checkpointing.
But job is failing to recover from checkpoint on restart.

For broadcast variable it says :
1.WARN TaskSetManager: Lost task 15.0 in stage 7.0 (TID 1269, hostIP):
java.lang.ClassCastException: [B cannot be cast to
pkg.broadcastvariableclassname
at point where i call bcvariable.value() in map function.

 at  mapfunction......
at
org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$fn$3$1.apply(JavaDStreamLike.scala:184)
        at
org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$fn$3$1.apply(JavaDStreamLike.scala:184)
        at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634)
        at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634)
        at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
        at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
        at org.apache.spark.scheduler.Task.run(Task.scala:64)
        at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)

2.For accumulator variable it says :
15/07/29 19:23:12 ERROR DAGScheduler: Failed to update accumulators for
ResultTask(1, 16)
java.util.NoSuchElementException: key not found: 2
        at scala.collection.MapLike$class.default(MapLike.scala:228)
        at scala.collection.AbstractMap.default(Map.scala:58)
        at scala.collection.mutable.HashMap.apply(HashMap.scala:64)
        at
org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:894)
        at
org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:893)
        at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
        at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
        at
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
        at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
        at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
        at
org.apache.spark.scheduler.DAGScheduler.updateAccumulators(DAGScheduler.scala:893)

its descibed in
https://issues.apache.org/jira/browse/SPARK-5206

I can afford to reset the accumulator to 0 on stream restart . Is it
possible to have it working ?

Thanks

Reply via email to