I prepared simple example helping in reproducing problem: https://github.com/alberskib/spark-streaming-broadcast-issue
I think that in that way it will be easier for you to understand problem and find solution (if any exists) Thanks Bartek 2015-12-16 23:34 GMT+01:00 Bartłomiej Alberski <albers...@gmail.com>: > First of all , thanks @tdas for looking into my problem. > > Yes, I checked it seperately and it is working fine. For below piece of > code there is no single exception and values are sent correctly. > > val reporter = new MyClassReporter(...) > reporter.send(...) > val out = new FileOutputStream("out123.txt") > val outO = new ObjectOutputStream(out) > outO.writeObject(reporter) > outO.flush() > outO.close() > > val in = new FileInputStream("out123.txt") > val inO = new ObjectInputStream(in) > val reporterFromFile = > inO.readObject().asInstanceOf[StreamingGraphiteReporter] > reporterFromFile.send(...) > in.close() > > Maybe I am wrong but I think that it will be strange if class implementing > Serializable and properly broadcasted to executors cannot be serialized and > deserialized? > I also prepared slightly different piece of code and I received slightly > different exception. Right now it looks like: > java.lang.ClassCastException: [B cannot be cast to com.example.sender. > MyClassReporter. > > Maybe I am wrong but, it looks like that when restarting from checkpoint > it does read proper block of memory to read bytes for MyClassReporter. > > 2015-12-16 2:38 GMT+01:00 Tathagata Das <t...@databricks.com>: > >> Could you test serializing and deserializing the MyClassReporter class >> separately? >> >> On Mon, Dec 14, 2015 at 8:57 AM, Bartłomiej Alberski <albers...@gmail.com >> > wrote: >> >>> Below is the full stacktrace(real names of my classes were changed) with >>> short description of entries from my code: >>> >>> rdd.mapPartitions{ partition => //this is the line to which second >>> stacktrace entry is pointing >>> val sender = broadcastedValue.value // this is the maing place to >>> which first stacktrace entry is pointing >>> } >>> >>> java.lang.ClassCastException: >>> org.apache.spark.util.SerializableConfiguration cannot be cast to >>> com.example.sender.MyClassReporter >>> at com.example.flow.Calculator >>> $$anonfun$saveAndLoadHll$1$$anonfun$apply$14.apply(AnomalyDetection.scala:87) >>> at com.example.flow.Calculator >>> $$anonfun$saveAndLoadHll$1$$anonfun$apply$14.apply(Calculator.scala:82) >>> at >>> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:706) >>> at >>> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:706) >>> at >>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >>> at >>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) >>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) >>> at >>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) >>> at >>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) >>> at org.apache.spark.scheduler.Task.run(Task.scala:88) >>> at >>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) >>> at >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >>> at >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >>> at java.lang.Thread.run(Thread.java:745) >>> >>> 2015-12-14 17:10 GMT+01:00 Ted Yu <yuzhih...@gmail.com>: >>> >>>> Can you show the complete stack trace for the ClassCastException ? >>>> >>>> Please see the following thread: >>>> http://search-hadoop.com/m/q3RTtgEUHVmJA1T1 >>>> >>>> Cheers >>>> >>>> On Mon, Dec 14, 2015 at 7:33 AM, alberskib <albers...@gmail.com> wrote: >>>> >>>>> Hey all, >>>>> >>>>> When my streaming application is restarting from failure (from >>>>> checkpoint) I >>>>> am receiving strange error: >>>>> >>>>> java.lang.ClassCastException: >>>>> org.apache.spark.util.SerializableConfiguration cannot be cast to >>>>> com.example.sender.MyClassReporter. >>>>> >>>>> Instance of B class is created on driver side (with proper config >>>>> passed as >>>>> constructor arg) and broadcasted to the executors in order to ensure >>>>> that on >>>>> each worker there will be only single instance. Everything is going >>>>> well up >>>>> to place where I am getting value of broadcasted field and executing >>>>> function on it i.e. >>>>> broadcastedValue.value.send(...) >>>>> >>>>> Below you can find definition of MyClassReporter (with trait): >>>>> >>>>> trait Reporter{ >>>>> def send(name: String, value: String, timestamp: Long) : Unit >>>>> def flush() : Unit >>>>> } >>>>> >>>>> class MyClassReporter(config: MyClassConfig, flow: String) extends >>>>> Reporter >>>>> with Serializable { >>>>> >>>>> val prefix = s"${config.senderConfig.prefix}.$flow" >>>>> >>>>> var counter = 0 >>>>> >>>>> @transient >>>>> private lazy val sender : GraphiteSender = initialize() >>>>> >>>>> @transient >>>>> private lazy val threadPool = >>>>> >>>>> ExecutionContext.fromExecutorService(Executors.newSingleThreadExecutor()) >>>>> >>>>> private def initialize() = { >>>>> val sender = new Sender( >>>>> new InetSocketAddress(config.senderConfig.hostname, >>>>> config.senderConfig.port) >>>>> ) >>>>> sys.addShutdownHook{ >>>>> sender.close() >>>>> } >>>>> sender >>>>> } >>>>> >>>>> override def send(name: String, value: String, timestamp: Long) : >>>>> Unit = { >>>>> threadPool.submit(new Runnable { >>>>> override def run(): Unit = { >>>>> try { >>>>> counter += 1 >>>>> if (!sender.isConnected) >>>>> sender.connect() >>>>> sender.send(s"$prefix.$name", value, timestamp) >>>>> if (counter % graphiteConfig.batchSize == 0) >>>>> sender.flush() >>>>> }catch { >>>>> case NonFatal(e) => { >>>>> println(s"Problem with sending metric to graphite >>>>> $prefix.$name: >>>>> $value at $timestamp: ${e.getMessage}", e) >>>>> Try{sender.close()}.recover{ >>>>> case NonFatal(e) => println(s"Error closing graphite >>>>> ${e.getMessage}", e) >>>>> } >>>>> } >>>>> } >>>>> } >>>>> }) >>>>> } >>>>> >>>>> Do you have any idea how I can solve this issue? Using broadcasted >>>>> variable >>>>> helps me keeping single socket open to the service on executor. >>>>> >>>>> >>>>> >>>>> -- >>>>> View this message in context: >>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-java-lang-ClassCastException-org-apache-spark-util-SerializableConfiguration-on-restt-tp25698.html >>>>> Sent from the Apache Spark User List mailing list archive at >>>>> Nabble.com. >>>>> >>>>> --------------------------------------------------------------------- >>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>>>> For additional commands, e-mail: user-h...@spark.apache.org >>>>> >>>>> >>>> >>> >> >