I prepared simple example helping in reproducing problem:

https://github.com/alberskib/spark-streaming-broadcast-issue

I think that in that way it will be easier for you to understand problem
and find solution (if any exists)

Thanks
Bartek

2015-12-16 23:34 GMT+01:00 Bartłomiej Alberski <albers...@gmail.com>:

> First of all , thanks @tdas for looking into my problem.
>
> Yes, I checked it seperately and it is working fine. For below piece of
> code there is no single exception and values are sent correctly.
>
>     val reporter = new MyClassReporter(...)
>     reporter.send(...)
>     val out = new FileOutputStream("out123.txt")
>     val outO = new ObjectOutputStream(out)
>     outO.writeObject(reporter)
>     outO.flush()
>     outO.close()
>
>     val in = new FileInputStream("out123.txt")
>     val inO = new ObjectInputStream(in)
>     val reporterFromFile  =
> inO.readObject().asInstanceOf[StreamingGraphiteReporter]
>     reporterFromFile.send(...)
>     in.close()
>
> Maybe I am wrong but I think that it will be strange if class implementing
> Serializable and properly broadcasted to executors cannot be serialized and
> deserialized?
> I also prepared slightly different piece of code and I received slightly
> different exception. Right now it looks like:
> java.lang.ClassCastException: [B cannot be cast to com.example.sender.
> MyClassReporter.
>
> Maybe I am wrong but, it looks like that when restarting from checkpoint
> it does read proper block of memory to read bytes for MyClassReporter.
>
> 2015-12-16 2:38 GMT+01:00 Tathagata Das <t...@databricks.com>:
>
>> Could you test serializing and deserializing the MyClassReporter  class
>> separately?
>>
>> On Mon, Dec 14, 2015 at 8:57 AM, Bartłomiej Alberski <albers...@gmail.com
>> > wrote:
>>
>>> Below is the full stacktrace(real names of my classes were changed) with
>>> short description of entries from my code:
>>>
>>> rdd.mapPartitions{ partition => //this is the line to which second
>>> stacktrace entry is pointing
>>>   val sender =  broadcastedValue.value // this is the maing place to
>>> which first stacktrace entry is pointing
>>> }
>>>
>>> java.lang.ClassCastException:
>>> org.apache.spark.util.SerializableConfiguration cannot be cast to
>>> com.example.sender.MyClassReporter
>>>         at com.example.flow.Calculator
>>> $$anonfun$saveAndLoadHll$1$$anonfun$apply$14.apply(AnomalyDetection.scala:87)
>>>         at com.example.flow.Calculator
>>> $$anonfun$saveAndLoadHll$1$$anonfun$apply$14.apply(Calculator.scala:82)
>>>         at
>>> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:706)
>>>         at
>>> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:706)
>>>         at
>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>>         at
>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>>>         at
>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>>>         at
>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>>>         at org.apache.spark.scheduler.Task.run(Task.scala:88)
>>>         at
>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>>>         at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>         at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>         at java.lang.Thread.run(Thread.java:745)
>>>
>>> 2015-12-14 17:10 GMT+01:00 Ted Yu <yuzhih...@gmail.com>:
>>>
>>>> Can you show the complete stack trace for the ClassCastException ?
>>>>
>>>> Please see the following thread:
>>>> http://search-hadoop.com/m/q3RTtgEUHVmJA1T1
>>>>
>>>> Cheers
>>>>
>>>> On Mon, Dec 14, 2015 at 7:33 AM, alberskib <albers...@gmail.com> wrote:
>>>>
>>>>> Hey all,
>>>>>
>>>>> When my streaming application is restarting from failure (from
>>>>> checkpoint) I
>>>>> am receiving strange error:
>>>>>
>>>>> java.lang.ClassCastException:
>>>>> org.apache.spark.util.SerializableConfiguration cannot be cast to
>>>>> com.example.sender.MyClassReporter.
>>>>>
>>>>> Instance of B class is created on driver side (with proper config
>>>>> passed as
>>>>> constructor arg) and broadcasted to the executors in order to ensure
>>>>> that on
>>>>> each worker there will be only single instance. Everything is going
>>>>> well up
>>>>> to place where I am getting value of broadcasted field and executing
>>>>> function on it i.e.
>>>>> broadcastedValue.value.send(...)
>>>>>
>>>>> Below you can find definition of MyClassReporter (with trait):
>>>>>
>>>>> trait Reporter{
>>>>>   def send(name: String, value: String, timestamp: Long) : Unit
>>>>>   def flush() : Unit
>>>>> }
>>>>>
>>>>> class MyClassReporter(config: MyClassConfig, flow: String) extends
>>>>> Reporter
>>>>> with Serializable {
>>>>>
>>>>>   val prefix = s"${config.senderConfig.prefix}.$flow"
>>>>>
>>>>>   var counter = 0
>>>>>
>>>>>   @transient
>>>>>   private lazy val sender : GraphiteSender = initialize()
>>>>>
>>>>>   @transient
>>>>>   private lazy val threadPool =
>>>>>
>>>>> ExecutionContext.fromExecutorService(Executors.newSingleThreadExecutor())
>>>>>
>>>>>   private def initialize() = {
>>>>>       val sender = new Sender(
>>>>>         new InetSocketAddress(config.senderConfig.hostname,
>>>>> config.senderConfig.port)
>>>>>       )
>>>>>       sys.addShutdownHook{
>>>>>         sender.close()
>>>>>       }
>>>>>       sender
>>>>>   }
>>>>>
>>>>>   override def send(name: String, value: String, timestamp: Long) :
>>>>> Unit = {
>>>>>     threadPool.submit(new Runnable {
>>>>>       override def run(): Unit = {
>>>>>         try {
>>>>>           counter += 1
>>>>>           if (!sender.isConnected)
>>>>>             sender.connect()
>>>>>           sender.send(s"$prefix.$name", value, timestamp)
>>>>>           if (counter % graphiteConfig.batchSize == 0)
>>>>>             sender.flush()
>>>>>         }catch {
>>>>>           case NonFatal(e) => {
>>>>>             println(s"Problem with sending metric to graphite
>>>>> $prefix.$name:
>>>>> $value at $timestamp: ${e.getMessage}", e)
>>>>>             Try{sender.close()}.recover{
>>>>>               case NonFatal(e) => println(s"Error closing graphite
>>>>> ${e.getMessage}", e)
>>>>>             }
>>>>>           }
>>>>>         }
>>>>>       }
>>>>>     })
>>>>>   }
>>>>>
>>>>> Do you have any idea how I can solve this issue? Using broadcasted
>>>>> variable
>>>>> helps me keeping single socket open to the service on executor.
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> View this message in context:
>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-java-lang-ClassCastException-org-apache-spark-util-SerializableConfiguration-on-restt-tp25698.html
>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>> Nabble.com.
>>>>>
>>>>> ---------------------------------------------------------------------
>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to