Re: Spark streaming: java.lang.ClassCastException: org.apache.spark.util.SerializableConfiguration ... on restart from checkpoint

2017-08-08 Thread dcam
Considering the @transient annotations and the work done in the instance
initializer, not much state is really be broadcast to the executors. It
might be simpler to just create these instances on the executors, rather
than trying to broadcast them?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-java-lang-ClassCastException-org-apache-spark-util-SerializableConfiguration-on-restt-tp25698p29044.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark streaming: java.lang.ClassCastException: org.apache.spark.util.SerializableConfiguration ... on restart from checkpoint

2015-12-17 Thread Bartłomiej Alberski
I prepared simple example helping in reproducing problem:

https://github.com/alberskib/spark-streaming-broadcast-issue

I think that in that way it will be easier for you to understand problem
and find solution (if any exists)

Thanks
Bartek

2015-12-16 23:34 GMT+01:00 Bartłomiej Alberski :

> First of all , thanks @tdas for looking into my problem.
>
> Yes, I checked it seperately and it is working fine. For below piece of
> code there is no single exception and values are sent correctly.
>
> val reporter = new MyClassReporter(...)
> reporter.send(...)
> val out = new FileOutputStream("out123.txt")
> val outO = new ObjectOutputStream(out)
> outO.writeObject(reporter)
> outO.flush()
> outO.close()
>
> val in = new FileInputStream("out123.txt")
> val inO = new ObjectInputStream(in)
> val reporterFromFile  =
> inO.readObject().asInstanceOf[StreamingGraphiteReporter]
> reporterFromFile.send(...)
> in.close()
>
> Maybe I am wrong but I think that it will be strange if class implementing
> Serializable and properly broadcasted to executors cannot be serialized and
> deserialized?
> I also prepared slightly different piece of code and I received slightly
> different exception. Right now it looks like:
> java.lang.ClassCastException: [B cannot be cast to com.example.sender.
> MyClassReporter.
>
> Maybe I am wrong but, it looks like that when restarting from checkpoint
> it does read proper block of memory to read bytes for MyClassReporter.
>
> 2015-12-16 2:38 GMT+01:00 Tathagata Das :
>
>> Could you test serializing and deserializing the MyClassReporter  class
>> separately?
>>
>> On Mon, Dec 14, 2015 at 8:57 AM, Bartłomiej Alberski > > wrote:
>>
>>> Below is the full stacktrace(real names of my classes were changed) with
>>> short description of entries from my code:
>>>
>>> rdd.mapPartitions{ partition => //this is the line to which second
>>> stacktrace entry is pointing
>>>   val sender =  broadcastedValue.value // this is the maing place to
>>> which first stacktrace entry is pointing
>>> }
>>>
>>> java.lang.ClassCastException:
>>> org.apache.spark.util.SerializableConfiguration cannot be cast to
>>> com.example.sender.MyClassReporter
>>> at com.example.flow.Calculator
>>> $$anonfun$saveAndLoadHll$1$$anonfun$apply$14.apply(AnomalyDetection.scala:87)
>>> at com.example.flow.Calculator
>>> $$anonfun$saveAndLoadHll$1$$anonfun$apply$14.apply(Calculator.scala:82)
>>> at
>>> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:706)
>>> at
>>> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:706)
>>> at
>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>> at
>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>>> at
>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>>> at
>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>>> at org.apache.spark.scheduler.Task.run(Task.scala:88)
>>> at
>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>> at java.lang.Thread.run(Thread.java:745)
>>>
>>> 2015-12-14 17:10 GMT+01:00 Ted Yu :
>>>
 Can you show the complete stack trace for the ClassCastException ?

 Please see the following thread:
 http://search-hadoop.com/m/q3RTtgEUHVmJA1T1

 Cheers

 On Mon, Dec 14, 2015 at 7:33 AM, alberskib  wrote:

> Hey all,
>
> When my streaming application is restarting from failure (from
> checkpoint) I
> am receiving strange error:
>
> java.lang.ClassCastException:
> org.apache.spark.util.SerializableConfiguration cannot be cast to
> com.example.sender.MyClassReporter.
>
> Instance of B class is created on driver side (with proper config
> passed as
> constructor arg) and broadcasted to the executors in order to ensure
> that on
> each worker there will be only single instance. Everything is going
> well up
> to place where I am getting value of broadcasted field and executing
> function on it i.e.
> broadcastedValue.value.send(...)
>
> Below you can find definition of MyClassReporter (with trait):
>
> trait Reporter{
>   def send(name: String, value: String, timestamp: Long) : Unit
>   def flush() : Unit
> }
>
> class MyClassReporter(config: MyClassConfig, flow: String) extends
> Reporter
> with Serializable {
>
>   val prefix = 

Re: Spark streaming: java.lang.ClassCastException: org.apache.spark.util.SerializableConfiguration ... on restart from checkpoint

2015-12-17 Thread Shixiong Zhu
Streaming checkpoint doesn't support Accumulator or Broadcast. See
https://issues.apache.org/jira/browse/SPARK-5206

Here is a workaround:
https://issues.apache.org/jira/browse/SPARK-5206?focusedCommentId=14506806=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14506806

Best Regards,
Shixiong Zhu

2015-12-17 4:39 GMT-08:00 Bartłomiej Alberski :

> I prepared simple example helping in reproducing problem:
>
> https://github.com/alberskib/spark-streaming-broadcast-issue
>
> I think that in that way it will be easier for you to understand problem
> and find solution (if any exists)
>
> Thanks
> Bartek
>
> 2015-12-16 23:34 GMT+01:00 Bartłomiej Alberski :
>
>> First of all , thanks @tdas for looking into my problem.
>>
>> Yes, I checked it seperately and it is working fine. For below piece of
>> code there is no single exception and values are sent correctly.
>>
>> val reporter = new MyClassReporter(...)
>> reporter.send(...)
>> val out = new FileOutputStream("out123.txt")
>> val outO = new ObjectOutputStream(out)
>> outO.writeObject(reporter)
>> outO.flush()
>> outO.close()
>>
>> val in = new FileInputStream("out123.txt")
>> val inO = new ObjectInputStream(in)
>> val reporterFromFile  =
>> inO.readObject().asInstanceOf[StreamingGraphiteReporter]
>> reporterFromFile.send(...)
>> in.close()
>>
>> Maybe I am wrong but I think that it will be strange if class
>> implementing Serializable and properly broadcasted to executors cannot be
>> serialized and deserialized?
>> I also prepared slightly different piece of code and I received slightly
>> different exception. Right now it looks like:
>> java.lang.ClassCastException: [B cannot be cast to com.example.sender.
>> MyClassReporter.
>>
>> Maybe I am wrong but, it looks like that when restarting from checkpoint
>> it does read proper block of memory to read bytes for MyClassReporter.
>>
>> 2015-12-16 2:38 GMT+01:00 Tathagata Das :
>>
>>> Could you test serializing and deserializing the MyClassReporter  class
>>> separately?
>>>
>>> On Mon, Dec 14, 2015 at 8:57 AM, Bartłomiej Alberski <
>>> albers...@gmail.com> wrote:
>>>
 Below is the full stacktrace(real names of my classes were changed)
 with short description of entries from my code:

 rdd.mapPartitions{ partition => //this is the line to which second
 stacktrace entry is pointing
   val sender =  broadcastedValue.value // this is the maing place to
 which first stacktrace entry is pointing
 }

 java.lang.ClassCastException:
 org.apache.spark.util.SerializableConfiguration cannot be cast to
 com.example.sender.MyClassReporter
 at com.example.flow.Calculator
 $$anonfun$saveAndLoadHll$1$$anonfun$apply$14.apply(AnomalyDetection.scala:87)
 at com.example.flow.Calculator
 $$anonfun$saveAndLoadHll$1$$anonfun$apply$14.apply(Calculator.scala:82)
 at
 org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:706)
 at
 org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:706)
 at
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
 at
 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
 at org.apache.spark.scheduler.Task.run(Task.scala:88)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745)

 2015-12-14 17:10 GMT+01:00 Ted Yu :

> Can you show the complete stack trace for the ClassCastException ?
>
> Please see the following thread:
> http://search-hadoop.com/m/q3RTtgEUHVmJA1T1
>
> Cheers
>
> On Mon, Dec 14, 2015 at 7:33 AM, alberskib 
> wrote:
>
>> Hey all,
>>
>> When my streaming application is restarting from failure (from
>> checkpoint) I
>> am receiving strange error:
>>
>> java.lang.ClassCastException:
>> org.apache.spark.util.SerializableConfiguration cannot be cast to
>> com.example.sender.MyClassReporter.
>>
>> Instance of B class is created on driver side (with proper config
>> passed as
>> constructor arg) and broadcasted to the executors in order to ensure
>> that on
>> each worker there will be only single instance. Everything is going

Re: Spark streaming: java.lang.ClassCastException: org.apache.spark.util.SerializableConfiguration ... on restart from checkpoint

2015-12-16 Thread Bartłomiej Alberski
First of all , thanks @tdas for looking into my problem.

Yes, I checked it seperately and it is working fine. For below piece of
code there is no single exception and values are sent correctly.

val reporter = new MyClassReporter(...)
reporter.send(...)
val out = new FileOutputStream("out123.txt")
val outO = new ObjectOutputStream(out)
outO.writeObject(reporter)
outO.flush()
outO.close()

val in = new FileInputStream("out123.txt")
val inO = new ObjectInputStream(in)
val reporterFromFile  =
inO.readObject().asInstanceOf[StreamingGraphiteReporter]
reporterFromFile.send(...)
in.close()

Maybe I am wrong but I think that it will be strange if class implementing
Serializable and properly broadcasted to executors cannot be serialized and
deserialized?
I also prepared slightly different piece of code and I received slightly
different exception. Right now it looks like:
java.lang.ClassCastException: [B cannot be cast to com.example.sender.
MyClassReporter.

Maybe I am wrong but, it looks like that when restarting from checkpoint it
does read proper block of memory to read bytes for MyClassReporter.

2015-12-16 2:38 GMT+01:00 Tathagata Das :

> Could you test serializing and deserializing the MyClassReporter  class
> separately?
>
> On Mon, Dec 14, 2015 at 8:57 AM, Bartłomiej Alberski 
> wrote:
>
>> Below is the full stacktrace(real names of my classes were changed) with
>> short description of entries from my code:
>>
>> rdd.mapPartitions{ partition => //this is the line to which second
>> stacktrace entry is pointing
>>   val sender =  broadcastedValue.value // this is the maing place to
>> which first stacktrace entry is pointing
>> }
>>
>> java.lang.ClassCastException:
>> org.apache.spark.util.SerializableConfiguration cannot be cast to
>> com.example.sender.MyClassReporter
>> at com.example.flow.Calculator
>> $$anonfun$saveAndLoadHll$1$$anonfun$apply$14.apply(AnomalyDetection.scala:87)
>> at com.example.flow.Calculator
>> $$anonfun$saveAndLoadHll$1$$anonfun$apply$14.apply(Calculator.scala:82)
>> at
>> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:706)
>> at
>> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:706)
>> at
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>> at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>> at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>> at org.apache.spark.scheduler.Task.run(Task.scala:88)
>> at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>> at java.lang.Thread.run(Thread.java:745)
>>
>> 2015-12-14 17:10 GMT+01:00 Ted Yu :
>>
>>> Can you show the complete stack trace for the ClassCastException ?
>>>
>>> Please see the following thread:
>>> http://search-hadoop.com/m/q3RTtgEUHVmJA1T1
>>>
>>> Cheers
>>>
>>> On Mon, Dec 14, 2015 at 7:33 AM, alberskib  wrote:
>>>
 Hey all,

 When my streaming application is restarting from failure (from
 checkpoint) I
 am receiving strange error:

 java.lang.ClassCastException:
 org.apache.spark.util.SerializableConfiguration cannot be cast to
 com.example.sender.MyClassReporter.

 Instance of B class is created on driver side (with proper config
 passed as
 constructor arg) and broadcasted to the executors in order to ensure
 that on
 each worker there will be only single instance. Everything is going
 well up
 to place where I am getting value of broadcasted field and executing
 function on it i.e.
 broadcastedValue.value.send(...)

 Below you can find definition of MyClassReporter (with trait):

 trait Reporter{
   def send(name: String, value: String, timestamp: Long) : Unit
   def flush() : Unit
 }

 class MyClassReporter(config: MyClassConfig, flow: String) extends
 Reporter
 with Serializable {

   val prefix = s"${config.senderConfig.prefix}.$flow"

   var counter = 0

   @transient
   private lazy val sender : GraphiteSender = initialize()

   @transient
   private lazy val threadPool =

 ExecutionContext.fromExecutorService(Executors.newSingleThreadExecutor())

   private def initialize() = {
   val sender = new Sender(
 new InetSocketAddress(config.senderConfig.hostname,
 config.senderConfig.port)
   )

Spark streaming: java.lang.ClassCastException: org.apache.spark.util.SerializableConfiguration ... on restart from checkpoint

2015-12-14 Thread alberskib
Hey all,

When my streaming application is restarting from failure (from checkpoint) I
am receiving strange error:

java.lang.ClassCastException:
org.apache.spark.util.SerializableConfiguration cannot be cast to
com.example.sender.MyClassReporter.

Instance of B class is created on driver side (with proper config passed as
constructor arg) and broadcasted to the executors in order to ensure that on
each worker there will be only single instance. Everything is going well up
to place where I am getting value of broadcasted field and executing
function on it i.e.
broadcastedValue.value.send(...)

Below you can find definition of MyClassReporter (with trait):

trait Reporter{
  def send(name: String, value: String, timestamp: Long) : Unit
  def flush() : Unit
}

class MyClassReporter(config: MyClassConfig, flow: String) extends Reporter
with Serializable {

  val prefix = s"${config.senderConfig.prefix}.$flow"

  var counter = 0

  @transient
  private lazy val sender : GraphiteSender = initialize()

  @transient
  private lazy val threadPool =
ExecutionContext.fromExecutorService(Executors.newSingleThreadExecutor())

  private def initialize() = {
  val sender = new Sender(
new InetSocketAddress(config.senderConfig.hostname,
config.senderConfig.port)
  )
  sys.addShutdownHook{
sender.close()
  }
  sender
  }

  override def send(name: String, value: String, timestamp: Long) : Unit = {
threadPool.submit(new Runnable {
  override def run(): Unit = {
try {
  counter += 1
  if (!sender.isConnected)
sender.connect()
  sender.send(s"$prefix.$name", value, timestamp)
  if (counter % graphiteConfig.batchSize == 0)
sender.flush()
}catch {
  case NonFatal(e) => {
println(s"Problem with sending metric to graphite $prefix.$name:
$value at $timestamp: ${e.getMessage}", e)
Try{sender.close()}.recover{
  case NonFatal(e) => println(s"Error closing graphite
${e.getMessage}", e)
}
  }
}
  }
})
  }

Do you have any idea how I can solve this issue? Using broadcasted variable
helps me keeping single socket open to the service on executor.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-java-lang-ClassCastException-org-apache-spark-util-SerializableConfiguration-on-restt-tp25698.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark streaming: java.lang.ClassCastException: org.apache.spark.util.SerializableConfiguration ... on restart from checkpoint

2015-12-14 Thread Ted Yu
Can you show the complete stack trace for the ClassCastException ?

Please see the following thread:
http://search-hadoop.com/m/q3RTtgEUHVmJA1T1

Cheers

On Mon, Dec 14, 2015 at 7:33 AM, alberskib  wrote:

> Hey all,
>
> When my streaming application is restarting from failure (from checkpoint)
> I
> am receiving strange error:
>
> java.lang.ClassCastException:
> org.apache.spark.util.SerializableConfiguration cannot be cast to
> com.example.sender.MyClassReporter.
>
> Instance of B class is created on driver side (with proper config passed as
> constructor arg) and broadcasted to the executors in order to ensure that
> on
> each worker there will be only single instance. Everything is going well up
> to place where I am getting value of broadcasted field and executing
> function on it i.e.
> broadcastedValue.value.send(...)
>
> Below you can find definition of MyClassReporter (with trait):
>
> trait Reporter{
>   def send(name: String, value: String, timestamp: Long) : Unit
>   def flush() : Unit
> }
>
> class MyClassReporter(config: MyClassConfig, flow: String) extends Reporter
> with Serializable {
>
>   val prefix = s"${config.senderConfig.prefix}.$flow"
>
>   var counter = 0
>
>   @transient
>   private lazy val sender : GraphiteSender = initialize()
>
>   @transient
>   private lazy val threadPool =
> ExecutionContext.fromExecutorService(Executors.newSingleThreadExecutor())
>
>   private def initialize() = {
>   val sender = new Sender(
> new InetSocketAddress(config.senderConfig.hostname,
> config.senderConfig.port)
>   )
>   sys.addShutdownHook{
> sender.close()
>   }
>   sender
>   }
>
>   override def send(name: String, value: String, timestamp: Long) : Unit =
> {
> threadPool.submit(new Runnable {
>   override def run(): Unit = {
> try {
>   counter += 1
>   if (!sender.isConnected)
> sender.connect()
>   sender.send(s"$prefix.$name", value, timestamp)
>   if (counter % graphiteConfig.batchSize == 0)
> sender.flush()
> }catch {
>   case NonFatal(e) => {
> println(s"Problem with sending metric to graphite
> $prefix.$name:
> $value at $timestamp: ${e.getMessage}", e)
> Try{sender.close()}.recover{
>   case NonFatal(e) => println(s"Error closing graphite
> ${e.getMessage}", e)
> }
>   }
> }
>   }
> })
>   }
>
> Do you have any idea how I can solve this issue? Using broadcasted variable
> helps me keeping single socket open to the service on executor.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-java-lang-ClassCastException-org-apache-spark-util-SerializableConfiguration-on-restt-tp25698.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Spark streaming: java.lang.ClassCastException: org.apache.spark.util.SerializableConfiguration ... on restart from checkpoint

2015-12-14 Thread Bartłomiej Alberski
Below is the full stacktrace(real names of my classes were changed) with
short description of entries from my code:

rdd.mapPartitions{ partition => //this is the line to which second
stacktrace entry is pointing
  val sender =  broadcastedValue.value // this is the maing place to which
first stacktrace entry is pointing
}

java.lang.ClassCastException:
org.apache.spark.util.SerializableConfiguration cannot be cast to
com.example.sender.MyClassReporter
at com.example.flow.Calculator
$$anonfun$saveAndLoadHll$1$$anonfun$apply$14.apply(AnomalyDetection.scala:87)
at com.example.flow.Calculator
$$anonfun$saveAndLoadHll$1$$anonfun$apply$14.apply(Calculator.scala:82)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:706)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:706)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

2015-12-14 17:10 GMT+01:00 Ted Yu :

> Can you show the complete stack trace for the ClassCastException ?
>
> Please see the following thread:
> http://search-hadoop.com/m/q3RTtgEUHVmJA1T1
>
> Cheers
>
> On Mon, Dec 14, 2015 at 7:33 AM, alberskib  wrote:
>
>> Hey all,
>>
>> When my streaming application is restarting from failure (from
>> checkpoint) I
>> am receiving strange error:
>>
>> java.lang.ClassCastException:
>> org.apache.spark.util.SerializableConfiguration cannot be cast to
>> com.example.sender.MyClassReporter.
>>
>> Instance of B class is created on driver side (with proper config passed
>> as
>> constructor arg) and broadcasted to the executors in order to ensure that
>> on
>> each worker there will be only single instance. Everything is going well
>> up
>> to place where I am getting value of broadcasted field and executing
>> function on it i.e.
>> broadcastedValue.value.send(...)
>>
>> Below you can find definition of MyClassReporter (with trait):
>>
>> trait Reporter{
>>   def send(name: String, value: String, timestamp: Long) : Unit
>>   def flush() : Unit
>> }
>>
>> class MyClassReporter(config: MyClassConfig, flow: String) extends
>> Reporter
>> with Serializable {
>>
>>   val prefix = s"${config.senderConfig.prefix}.$flow"
>>
>>   var counter = 0
>>
>>   @transient
>>   private lazy val sender : GraphiteSender = initialize()
>>
>>   @transient
>>   private lazy val threadPool =
>> ExecutionContext.fromExecutorService(Executors.newSingleThreadExecutor())
>>
>>   private def initialize() = {
>>   val sender = new Sender(
>> new InetSocketAddress(config.senderConfig.hostname,
>> config.senderConfig.port)
>>   )
>>   sys.addShutdownHook{
>> sender.close()
>>   }
>>   sender
>>   }
>>
>>   override def send(name: String, value: String, timestamp: Long) : Unit
>> = {
>> threadPool.submit(new Runnable {
>>   override def run(): Unit = {
>> try {
>>   counter += 1
>>   if (!sender.isConnected)
>> sender.connect()
>>   sender.send(s"$prefix.$name", value, timestamp)
>>   if (counter % graphiteConfig.batchSize == 0)
>> sender.flush()
>> }catch {
>>   case NonFatal(e) => {
>> println(s"Problem with sending metric to graphite
>> $prefix.$name:
>> $value at $timestamp: ${e.getMessage}", e)
>> Try{sender.close()}.recover{
>>   case NonFatal(e) => println(s"Error closing graphite
>> ${e.getMessage}", e)
>> }
>>   }
>> }
>>   }
>> })
>>   }
>>
>> Do you have any idea how I can solve this issue? Using broadcasted
>> variable
>> helps me keeping single socket open to the service on executor.
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-java-lang-ClassCastException-org-apache-spark-util-SerializableConfiguration-on-restt-tp25698.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>