Below is the full stacktrace(real names of my classes were changed) with
short description of entries from my code:
rdd.mapPartitions{ partition => //this is the line to which second
stacktrace entry is pointing
val sender = broadcastedValue.value // this is the maing place to which
first stacktrace entry is pointing
}
java.lang.ClassCastException:
org.apache.spark.util.SerializableConfiguration cannot be cast to
com.example.sender.MyClassReporter
at com.example.flow.Calculator
$$anonfun$saveAndLoadHll$1$$anonfun$apply$14.apply(AnomalyDetection.scala:87)
at com.example.flow.Calculator
$$anonfun$saveAndLoadHll$1$$anonfun$apply$14.apply(Calculator.scala:82)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:706)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:706)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
2015-12-14 17:10 GMT+01:00 Ted Yu <[email protected]>:
> Can you show the complete stack trace for the ClassCastException ?
>
> Please see the following thread:
> http://search-hadoop.com/m/q3RTtgEUHVmJA1T1
>
> Cheers
>
> On Mon, Dec 14, 2015 at 7:33 AM, alberskib <[email protected]> wrote:
>
>> Hey all,
>>
>> When my streaming application is restarting from failure (from
>> checkpoint) I
>> am receiving strange error:
>>
>> java.lang.ClassCastException:
>> org.apache.spark.util.SerializableConfiguration cannot be cast to
>> com.example.sender.MyClassReporter.
>>
>> Instance of B class is created on driver side (with proper config passed
>> as
>> constructor arg) and broadcasted to the executors in order to ensure that
>> on
>> each worker there will be only single instance. Everything is going well
>> up
>> to place where I am getting value of broadcasted field and executing
>> function on it i.e.
>> broadcastedValue.value.send(...)
>>
>> Below you can find definition of MyClassReporter (with trait):
>>
>> trait Reporter{
>> def send(name: String, value: String, timestamp: Long) : Unit
>> def flush() : Unit
>> }
>>
>> class MyClassReporter(config: MyClassConfig, flow: String) extends
>> Reporter
>> with Serializable {
>>
>> val prefix = s"${config.senderConfig.prefix}.$flow"
>>
>> var counter = 0
>>
>> @transient
>> private lazy val sender : GraphiteSender = initialize()
>>
>> @transient
>> private lazy val threadPool =
>> ExecutionContext.fromExecutorService(Executors.newSingleThreadExecutor())
>>
>> private def initialize() = {
>> val sender = new Sender(
>> new InetSocketAddress(config.senderConfig.hostname,
>> config.senderConfig.port)
>> )
>> sys.addShutdownHook{
>> sender.close()
>> }
>> sender
>> }
>>
>> override def send(name: String, value: String, timestamp: Long) : Unit
>> = {
>> threadPool.submit(new Runnable {
>> override def run(): Unit = {
>> try {
>> counter += 1
>> if (!sender.isConnected)
>> sender.connect()
>> sender.send(s"$prefix.$name", value, timestamp)
>> if (counter % graphiteConfig.batchSize == 0)
>> sender.flush()
>> }catch {
>> case NonFatal(e) => {
>> println(s"Problem with sending metric to graphite
>> $prefix.$name:
>> $value at $timestamp: ${e.getMessage}", e)
>> Try{sender.close()}.recover{
>> case NonFatal(e) => println(s"Error closing graphite
>> ${e.getMessage}", e)
>> }
>> }
>> }
>> }
>> })
>> }
>>
>> Do you have any idea how I can solve this issue? Using broadcasted
>> variable
>> helps me keeping single socket open to the service on executor.
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-java-lang-ClassCastException-org-apache-spark-util-SerializableConfiguration-on-restt-tp25698.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: [email protected]
>> For additional commands, e-mail: [email protected]
>>
>>
>