bump.  any ideas?

On Jul 24, 2014, at 3:09 AM, Alan Ngai <a...@opsclarity.com> wrote:

> it looks like when you configure sparkconfig to use the kryoserializer in 
> combination of using an ActorReceiver, bad things happen.  I modified the 
> ActorWordCount example program from 
> 
>     val sparkConf = new SparkConf().setAppName("ActorWordCount")
> 
> to
> 
>     val sparkConf = new SparkConf()
>       .setAppName("ActorWordCount")
>       .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer”)
> 
> and I get the stack trace below.  I figured it might be that Kryo doesn’t 
> know how to serialize/deserialize the actor so I added a registry.  I also 
> added a default empty constructor to SampleActorReceiver just for kicks
> 
> class SerializationRegistry extends KryoRegistrator {
>   override def registerClasses(kryo: Kryo) {
>     kryo.register(classOf[SampleActorReceiver])
>   }
> }
> 
> …
> 
> case class SampleActorReceiver[T: ClassTag](var urlOfPublisher: String)
> extends Actor with ActorHelper {
>   def this() = this(“”)
>   ...
> }
> 
> ...
>     val sparkConf = new SparkConf()
>       .setAppName("ActorWordCount")
>       .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
>       .set("spark.kryo.registrator", 
> "org.apache.spark.examples.streaming.SerializationRegistry")
> 
> 
> None of this worked, same stack trace.  Any idea what’s going on?  Is this a 
> known issue and is there a workaround?  
> 
> 14/07/24 02:58:19 [ERROR] OneForOneStrategy: configuration problem while 
> creating [akka://spark/user/Supervisor0/SampleReceiver] with dispatcher 
> [akka.actor.default-dispatcher] and mailbox [akka.actor.default-mailbox]
>  akka.actor.ActorInitializationException: exception during creation
>       at akka.actor.ActorInitializationException$.apply(Actor.scala:218)
>       at akka.actor.ActorCell.create(ActorCell.scala:578)
>       at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:425)
>       at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)
>       at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
>       at akka.dispatch.Mailbox.run(Mailbox.scala:218)
>       at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>       at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>       at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>       at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>       at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: akka.ConfigurationException: configuration problem while creating 
> [akka://spark/user/Supervisor0/SampleReceiver] with dispatcher 
> [akka.actor.default-dispatcher] and mailbox [akka.actor.default-mailbox]
>       at akka.actor.LocalActorRefProvider.actorOf(ActorRefProvider.scala:723)
>       at 
> akka.remote.RemoteActorRefProvider.actorOf(RemoteActorRefProvider.scala:296)
>       at akka.actor.dungeon.Children$class.makeChild(Children.scala:191)
>       at akka.actor.dungeon.Children$class.actorOf(Children.scala:38)
>       at akka.actor.ActorCell.actorOf(ActorCell.scala:338)
>       at 
> org.apache.spark.streaming.receiver.ActorReceiver$Supervisor.<init>(ActorReceiver.scala:152)
>       at 
> org.apache.spark.streaming.receiver.ActorReceiver$$anonfun$supervisor$1.apply(ActorReceiver.scala:145)
>       at 
> org.apache.spark.streaming.receiver.ActorReceiver$$anonfun$supervisor$1.apply(ActorReceiver.scala:145)
>       at akka.actor.TypedCreatorFunctionConsumer.produce(Props.scala:401)
>       at akka.actor.Props.newActor(Props.scala:339)
>       at akka.actor.ActorCell.newActor(ActorCell.scala:534)
>       at akka.actor.ActorCell.create(ActorCell.scala:560)
>       ... 9 more
> Caused by: java.lang.IllegalArgumentException: constructor public 
> akka.actor.CreatorFunctionConsumer(scala.Function0) is incompatible with 
> arguments [class java.lang.Class, class 
> org.apache.spark.examples.streaming.ActorWordCount$$anonfun$2]
>       at akka.util.Reflect$.instantiate(Reflect.scala:69)
>       at akka.actor.Props.cachedActorClass(Props.scala:203)
>       at akka.actor.Props.actorClass(Props.scala:327)
>       at akka.dispatch.Mailboxes.getMailboxType(Mailboxes.scala:124)
>       at akka.actor.LocalActorRefProvider.actorOf(ActorRefProvider.scala:718)
>       ... 20 more
> Caused by: java.lang.IllegalArgumentException: wrong number of arguments
>       at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>       at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
>       at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>       at java.lang.reflect.Constructor.newInstance(Constructor.java:525)
>       at akka.util.Reflect$.instantiate(Reflect.scala:65)
>       ... 24 more
> 

Reply via email to