First of all , thanks @tdas for looking into my problem.
Yes, I checked it seperately and it is working fine. For below piece of
code there is no single exception and values are sent correctly.
val reporter = new MyClassReporter(...)
reporter.send(...)
val out = new FileOutputStream("out123.txt")
val outO = new ObjectOutputStream(out)
outO.writeObject(reporter)
outO.flush()
outO.close()
val in = new FileInputStream("out123.txt")
val inO = new ObjectInputStream(in)
val reporterFromFile =
inO.readObject().asInstanceOf[StreamingGraphiteReporter]
reporterFromFile.send(...)
in.close()
Maybe I am wrong but I think that it will be strange if class implementing
Serializable and properly broadcasted to executors cannot be serialized and
deserialized?
I also prepared slightly different piece of code and I received slightly
different exception. Right now it looks like:
java.lang.ClassCastException: [B cannot be cast to com.example.sender.
MyClassReporter.
Maybe I am wrong but, it looks like that when restarting from checkpoint it
does read proper block of memory to read bytes for MyClassReporter.
2015-12-16 2:38 GMT+01:00 Tathagata Das <[email protected]>:
> Could you test serializing and deserializing the MyClassReporter class
> separately?
>
> On Mon, Dec 14, 2015 at 8:57 AM, Bartłomiej Alberski <[email protected]>
> wrote:
>
>> Below is the full stacktrace(real names of my classes were changed) with
>> short description of entries from my code:
>>
>> rdd.mapPartitions{ partition => //this is the line to which second
>> stacktrace entry is pointing
>> val sender = broadcastedValue.value // this is the maing place to
>> which first stacktrace entry is pointing
>> }
>>
>> java.lang.ClassCastException:
>> org.apache.spark.util.SerializableConfiguration cannot be cast to
>> com.example.sender.MyClassReporter
>> at com.example.flow.Calculator
>> $$anonfun$saveAndLoadHll$1$$anonfun$apply$14.apply(AnomalyDetection.scala:87)
>> at com.example.flow.Calculator
>> $$anonfun$saveAndLoadHll$1$$anonfun$apply$14.apply(Calculator.scala:82)
>> at
>> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:706)
>> at
>> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:706)
>> at
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>> at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>> at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>> at org.apache.spark.scheduler.Task.run(Task.scala:88)
>> at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>> at java.lang.Thread.run(Thread.java:745)
>>
>> 2015-12-14 17:10 GMT+01:00 Ted Yu <[email protected]>:
>>
>>> Can you show the complete stack trace for the ClassCastException ?
>>>
>>> Please see the following thread:
>>> http://search-hadoop.com/m/q3RTtgEUHVmJA1T1
>>>
>>> Cheers
>>>
>>> On Mon, Dec 14, 2015 at 7:33 AM, alberskib <[email protected]> wrote:
>>>
>>>> Hey all,
>>>>
>>>> When my streaming application is restarting from failure (from
>>>> checkpoint) I
>>>> am receiving strange error:
>>>>
>>>> java.lang.ClassCastException:
>>>> org.apache.spark.util.SerializableConfiguration cannot be cast to
>>>> com.example.sender.MyClassReporter.
>>>>
>>>> Instance of B class is created on driver side (with proper config
>>>> passed as
>>>> constructor arg) and broadcasted to the executors in order to ensure
>>>> that on
>>>> each worker there will be only single instance. Everything is going
>>>> well up
>>>> to place where I am getting value of broadcasted field and executing
>>>> function on it i.e.
>>>> broadcastedValue.value.send(...)
>>>>
>>>> Below you can find definition of MyClassReporter (with trait):
>>>>
>>>> trait Reporter{
>>>> def send(name: String, value: String, timestamp: Long) : Unit
>>>> def flush() : Unit
>>>> }
>>>>
>>>> class MyClassReporter(config: MyClassConfig, flow: String) extends
>>>> Reporter
>>>> with Serializable {
>>>>
>>>> val prefix = s"${config.senderConfig.prefix}.$flow"
>>>>
>>>> var counter = 0
>>>>
>>>> @transient
>>>> private lazy val sender : GraphiteSender = initialize()
>>>>
>>>> @transient
>>>> private lazy val threadPool =
>>>>
>>>> ExecutionContext.fromExecutorService(Executors.newSingleThreadExecutor())
>>>>
>>>> private def initialize() = {
>>>> val sender = new Sender(
>>>> new InetSocketAddress(config.senderConfig.hostname,
>>>> config.senderConfig.port)
>>>> )
>>>> sys.addShutdownHook{
>>>> sender.close()
>>>> }
>>>> sender
>>>> }
>>>>
>>>> override def send(name: String, value: String, timestamp: Long) :
>>>> Unit = {
>>>> threadPool.submit(new Runnable {
>>>> override def run(): Unit = {
>>>> try {
>>>> counter += 1
>>>> if (!sender.isConnected)
>>>> sender.connect()
>>>> sender.send(s"$prefix.$name", value, timestamp)
>>>> if (counter % graphiteConfig.batchSize == 0)
>>>> sender.flush()
>>>> }catch {
>>>> case NonFatal(e) => {
>>>> println(s"Problem with sending metric to graphite
>>>> $prefix.$name:
>>>> $value at $timestamp: ${e.getMessage}", e)
>>>> Try{sender.close()}.recover{
>>>> case NonFatal(e) => println(s"Error closing graphite
>>>> ${e.getMessage}", e)
>>>> }
>>>> }
>>>> }
>>>> }
>>>> })
>>>> }
>>>>
>>>> Do you have any idea how I can solve this issue? Using broadcasted
>>>> variable
>>>> helps me keeping single socket open to the service on executor.
>>>>
>>>>
>>>>
>>>> --
>>>> View this message in context:
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-java-lang-ClassCastException-org-apache-spark-util-SerializableConfiguration-on-restt-tp25698.html
>>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>>
>>>> ---------------------------------------------------------------------
>>>> To unsubscribe, e-mail: [email protected]
>>>> For additional commands, e-mail: [email protected]
>>>>
>>>>
>>>
>>
>