Hi
I am using spark streaming 1.3 and using checkpointing.
But job is failing to recover from checkpoint on restart.
For broadcast variable it says :
1.WARN TaskSetManager: Lost task 15.0 in stage 7.0 (TID 1269, hostIP):
java.lang.ClassCastException: [B cannot be cast to
pkg.broadcastvariableclassname
at point where i call bcvariable.value() in map function.
at mapfunction......
at
org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$fn$3$1.apply(JavaDStreamLike.scala:184)
at
org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$fn$3$1.apply(JavaDStreamLike.scala:184)
at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634)
at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
2.For accumulator variable it says :
15/07/29 19:23:12 ERROR DAGScheduler: Failed to update accumulators for
ResultTask(1, 16)
java.util.NoSuchElementException: key not found: 2
at scala.collection.MapLike$class.default(MapLike.scala:228)
at scala.collection.AbstractMap.default(Map.scala:58)
at scala.collection.mutable.HashMap.apply(HashMap.scala:64)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:894)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:893)
at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
at
org.apache.spark.scheduler.DAGScheduler.updateAccumulators(DAGScheduler.scala:893)
its descibed in
https://issues.apache.org/jira/browse/SPARK-5206
I can afford to reset the accumulator to 0 on stream restart . Is it
possible to have it working ?
Thanks