Another possible reason behind this maybe that there are two versions of
Akka present in the classpath, which are interfering with each other. This
could happen through many scenarios.

1. Launching Spark application with Scala brings in Akka from Scala, which
interferes with Spark's Akka
2. Multiple Akka through some transitive dependencies

TD


On Thu, Aug 7, 2014 at 2:30 AM, Rohit Rai <ro...@tuplejump.com> wrote:

> Alan/TD,
>
> We are facing the problem in a project going to production.
>
> Was there any progress on this? Are we able to confirm that this is a
> bug/limitation in the current streaming code? Or there is anything wrong in
> user scope?
>
> Regards,
> Rohit
>
> *Founder & CEO, **Tuplejump, Inc.*
> ____________________________
> www.tuplejump.com
> *The Data Engineering Platform*
>
>
> On Sat, Jul 26, 2014 at 6:26 AM, Alan Ngai <a...@opsclarity.com> wrote:
>
>> The stack trace was from running the Actor count sample directly, without
>> a spark cluster, so I guess the logs would be from both?  I enabled more
>> logging and got this stack trace
>>
>> 4/07/25 17:55:26 [INFO] SecurityManager: Changing view acls to: alan
>>  14/07/25 17:55:26 [INFO] SecurityManager: SecurityManager:
>> authentication disabled; ui acls disabled; users with view permissions:
>> Set(alan)
>>  14/07/25 17:55:26 [DEBUG] AkkaUtils: In createActorSystem, requireCookie
>> is: off
>>  14/07/25 17:55:26 [INFO] Slf4jLogger: Slf4jLogger started
>>  14/07/25 17:55:27 [INFO] Remoting: Starting remoting
>>  14/07/25 17:55:27 [INFO] Remoting: Remoting started; listening on
>> addresses :[akka.tcp://spark@leungshwingchun:52156]
>>  14/07/25 17:55:27 [INFO] Remoting: Remoting now listens on addresses: [
>> akka.tcp://spark@leungshwingchun:52156]
>>  14/07/25 17:55:27 [INFO] SparkEnv: Registering MapOutputTracker
>>  14/07/25 17:55:27 [INFO] SparkEnv: Registering BlockManagerMaster
>>  14/07/25 17:55:27 [DEBUG] DiskBlockManager: Creating local directories
>> at root dirs '/var/folders/fq/fzcyqkcx7rgbycg4kr8z3m180000gn/T/'
>>  14/07/25 17:55:27 [INFO] DiskBlockManager: Created local directory at
>> /var/folders/fq/fzcyqkcx7rgbycg4kr8z3m180000gn/T/spark-local-20140725175527-32f2
>>  14/07/25 17:55:27 [INFO] MemoryStore: MemoryStore started with capacity
>> 297.0 MB.
>>  14/07/25 17:55:27 [INFO] ConnectionManager: Bound socket to port 52157
>> with id = ConnectionManagerId(leungshwingchun,52157)
>>  14/07/25 17:55:27 [INFO] BlockManagerMaster: Trying to register
>> BlockManager
>>  14/07/25 17:55:27 [INFO] BlockManagerInfo: Registering block manager
>> leungshwingchun:52157 with 297.0 MB RAM
>>  14/07/25 17:55:27 [INFO] BlockManagerMaster: Registered BlockManager
>>  14/07/25 17:55:27 [INFO] HttpServer: Starting HTTP Server
>>  14/07/25 17:55:27 [DEBUG] HttpServer: HttpServer is not using security
>>  14/07/25 17:55:27 [INFO] HttpBroadcast: Broadcast server started at
>> http://192.168.1.233:52158
>>  14/07/25 17:55:27 [INFO] HttpFileServer: HTTP File server directory is
>> /var/folders/fq/fzcyqkcx7rgbycg4kr8z3m180000gn/T/spark-5254ba11-037b-4761-b92a-3b18d42762de
>>  14/07/25 17:55:27 [INFO] HttpServer: Starting HTTP Server
>>  14/07/25 17:55:27 [DEBUG] HttpServer: HttpServer is not using security
>>  14/07/25 17:55:27 [DEBUG] HttpFileServer: HTTP file server started at:
>> http://192.168.1.233:52159
>>  14/07/25 17:55:27 [INFO] SparkUI: Started SparkUI at
>> http://leungshwingchun:4040
>>  14/07/25 17:55:27 [DEBUG] MutableMetricsFactory: field
>> org.apache.hadoop.metrics2.lib.MutableRate
>> org.apache.hadoop.security.UserGroupInformation$UgiMetrics.loginSuccess
>> with annotation
>> @org.apache.hadoop.metrics2.annotation.Metric(valueName=Time, about=,
>> value=[Rate of successful kerberos logins and latency (milliseconds)],
>> always=false, type=DEFAULT, sampleName=Ops)
>>  14/07/25 17:55:27 [DEBUG] MutableMetricsFactory: field
>> org.apache.hadoop.metrics2.lib.MutableRate
>> org.apache.hadoop.security.UserGroupInformation$UgiMetrics.loginFailure
>> with annotation
>> @org.apache.hadoop.metrics2.annotation.Metric(valueName=Time, about=,
>> value=[Rate of failed kerberos logins and latency (milliseconds)],
>> always=false, type=DEFAULT, sampleName=Ops)
>>  14/07/25 17:55:27 [DEBUG] MetricsSystemImpl: UgiMetrics, User and group
>> related metrics
>>  2014-07-25 17:55:27.796 java[79107:1703] Unable to load realm info from
>> SCDynamicStore
>> 14/07/25 17:55:27 [DEBUG] KerberosName: Kerberos krb5 configuration not
>> found, setting default realm to empty
>>  14/07/25 17:55:27 [DEBUG] Groups:  Creating new Groups object
>>  14/07/25 17:55:27 [DEBUG] NativeCodeLoader: Trying to load the
>> custom-built native-hadoop library...
>>  14/07/25 17:55:27 [DEBUG] NativeCodeLoader: Failed to load native-hadoop
>> with error: java.lang.UnsatisfiedLinkError: no hadoop in java.library.path
>>  14/07/25 17:55:27 [DEBUG] NativeCodeLoader: java.library.path=
>>  14/07/25 17:55:27 [WARN] NativeCodeLoader: Unable to load native-hadoop
>> library for your platform... using builtin-java classes where applicable
>>  14/07/25 17:55:27 [DEBUG] JniBasedUnixGroupsMappingWithFallback: Falling
>> back to shell based
>>  14/07/25 17:55:27 [DEBUG] JniBasedUnixGroupsMappingWithFallback: Group
>> mapping impl=org.apache.hadoop.security.ShellBasedUnixGroupsMapping
>>  14/07/25 17:55:27 [DEBUG] Groups: Group mapping
>> impl=org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback;
>> cacheTimeout=300000
>>  14/07/25 17:55:28 [INFO] SparkContext: Added JAR
>> file:/Users/alan/dev/spark-dev/examples/target/scala-2.10/spark-examples-1.0.1-hadoop2.2.0.jar
>> at http://192.168.1.233:52159/jars/spark-examples-1.0.1-hadoop2.2.0.jar
>> with timestamp 1406336128212
>>  14/07/25 17:55:28 [DEBUG] JobScheduler: Starting JobScheduler
>>  14/07/25 17:55:28 [INFO] ReceiverTracker: ReceiverTracker started
>>  14/07/25 17:55:28 [INFO] ForEachDStream: metadataCleanupDelay = -1
>>  14/07/25 17:55:28 [INFO] ShuffledDStream: metadataCleanupDelay = -1
>>  14/07/25 17:55:28 [INFO] MappedDStream: metadataCleanupDelay = -1
>>  14/07/25 17:55:28 [INFO] FlatMappedDStream: metadataCleanupDelay = -1
>>  14/07/25 17:55:28 [INFO] PluggableInputDStream: metadataCleanupDelay = -1
>>  14/07/25 17:55:28 [INFO] PluggableInputDStream: Slide time = 2000 ms
>>  14/07/25 17:55:28 [INFO] PluggableInputDStream: Storage level =
>> StorageLevel(false, false, false, false, 1)
>>  14/07/25 17:55:28 [INFO] PluggableInputDStream: Checkpoint interval =
>> null
>>  14/07/25 17:55:28 [INFO] PluggableInputDStream: Remember duration = 2000
>> ms
>>  14/07/25 17:55:28 [INFO] PluggableInputDStream: Initialized and
>> validated org.apache.spark.streaming.dstream.PluggableInputDStream@487c05c8
>>  14/07/25 17:55:28 [INFO] FlatMappedDStream: Slide time = 2000 ms
>>  14/07/25 17:55:28 [INFO] FlatMappedDStream: Storage level =
>> StorageLevel(false, false, false, false, 1)
>>  14/07/25 17:55:28 [INFO] FlatMappedDStream: Checkpoint interval = null
>>  14/07/25 17:55:28 [INFO] FlatMappedDStream: Remember duration = 2000 ms
>>  14/07/25 17:55:28 [INFO] FlatMappedDStream: Initialized and validated
>> org.apache.spark.streaming.dstream.FlatMappedDStream@21ea2c7
>>  14/07/25 17:55:28 [INFO] MappedDStream: Slide time = 2000 ms
>>  14/07/25 17:55:28 [INFO] MappedDStream: Storage level =
>> StorageLevel(false, false, false, false, 1)
>>  14/07/25 17:55:28 [INFO] MappedDStream: Checkpoint interval = null
>>  14/07/25 17:55:28 [INFO] MappedDStream: Remember duration = 2000 ms
>>  14/07/25 17:55:28 [INFO] MappedDStream: Initialized and validated
>> org.apache.spark.streaming.dstream.MappedDStream@1bc31f72
>>  14/07/25 17:55:28 [INFO] ShuffledDStream: Slide time = 2000 ms
>>  14/07/25 17:55:28 [INFO] ShuffledDStream: Storage level =
>> StorageLevel(false, false, false, false, 1)
>>  14/07/25 17:55:28 [INFO] ShuffledDStream: Checkpoint interval = null
>>  14/07/25 17:55:28 [INFO] ShuffledDStream: Remember duration = 2000 ms
>>  14/07/25 17:55:28 [INFO] ShuffledDStream: Initialized and validated
>> org.apache.spark.streaming.dstream.ShuffledDStream@1e66fc2d
>>  14/07/25 17:55:28 [INFO] ForEachDStream: Slide time = 2000 ms
>>  14/07/25 17:55:28 [INFO] ForEachDStream: Storage level =
>> StorageLevel(false, false, false, false, 1)
>>  14/07/25 17:55:28 [INFO] ForEachDStream: Checkpoint interval = null
>>  14/07/25 17:55:28 [INFO] ForEachDStream: Remember duration = 2000 ms
>>  14/07/25 17:55:28 [INFO] ForEachDStream: Initialized and validated
>> org.apache.spark.streaming.dstream.ForEachDStream@15b35a10
>>  14/07/25 17:55:28 [INFO] ReceiverTracker: Starting 1 receivers
>>  14/07/25 17:55:28 [INFO] SparkContext: Starting job: runJob at
>> ReceiverTracker.scala:275
>>  14/07/25 17:55:28 [INFO] DAGScheduler: Got job 0 (runJob at
>> ReceiverTracker.scala:275) with 1 output partitions (allowLocal=false)
>>  14/07/25 17:55:28 [INFO] DAGScheduler: Final stage: Stage 0(runJob at
>> ReceiverTracker.scala:275)
>>  14/07/25 17:55:28 [INFO] DAGScheduler: Parents of final stage: List()
>>  14/07/25 17:55:28 [INFO] DAGScheduler: Missing parents: List()
>>  14/07/25 17:55:28 [DEBUG] DAGScheduler: submitStage(Stage 0)
>>  14/07/25 17:55:28 [DEBUG] DAGScheduler: missing: List()
>>  14/07/25 17:55:28 [INFO] DAGScheduler: Submitting Stage 0
>> (ParallelCollectionRDD[0] at makeRDD at ReceiverTracker.scala:253), which
>> has no missing parents
>>  14/07/25 17:55:28 [DEBUG] DAGScheduler: submitMissingTasks(Stage 0)
>>  14/07/25 17:55:28 [INFO] RecurringTimer: Started timer for JobGenerator
>> at time 1406336130000
>>  14/07/25 17:55:28 [INFO] JobGenerator: Started JobGenerator at
>> 1406336130000 ms
>>  14/07/25 17:55:28 [INFO] JobScheduler: Started JobScheduler
>>  14/07/25 17:55:28 [DEBUG] KryoSerializer: Running user registrator:
>> org.apache.spark.examples.streaming.SerializationRegistry
>>  14/07/25 17:55:28 [INFO] DAGScheduler: Submitting 1 missing tasks from
>> Stage 0 (ParallelCollectionRDD[0] at makeRDD at ReceiverTracker.scala:253)
>>  14/07/25 17:55:28 [DEBUG] DAGScheduler: New pending tasks:
>> Set(ResultTask(0, 0))
>>  14/07/25 17:55:28 [INFO] TaskSchedulerImpl: Adding task set 0.0 with 1
>> tasks
>>  14/07/25 17:55:28 [DEBUG] TaskSetManager: Epoch for TaskSet 0.0: 0
>>  14/07/25 17:55:28 [DEBUG] TaskSetManager: Valid locality levels for
>> TaskSet 0.0: ANY
>>  14/07/25 17:55:28 [DEBUG] TaskSchedulerImpl: parentName: , name:
>> TaskSet_0, runningTasks: 0
>>  14/07/25 17:55:28 [INFO] TaskSetManager: Starting task 0.0:0 as TID 0 on
>> executor localhost: localhost (PROCESS_LOCAL)
>>  14/07/25 17:55:28 [DEBUG] KryoSerializer: Running user registrator:
>> org.apache.spark.examples.streaming.SerializationRegistry
>>  14/07/25 17:55:28 [INFO] TaskSetManager: Serialized task 0.0:0 as 1750
>> bytes in 8 ms
>>  14/07/25 17:55:28 [INFO] Executor: Running task ID 0
>>  14/07/25 17:55:28 [INFO] Executor: Fetching
>> http://192.168.1.233:52159/jars/spark-examples-1.0.1-hadoop2.2.0.jar
>> with timestamp 1406336128212
>>  14/07/25 17:55:28 [INFO] Utils: Fetching
>> http://192.168.1.233:52159/jars/spark-examples-1.0.1-hadoop2.2.0.jar to
>> /var/folders/fq/fzcyqkcx7rgbycg4kr8z3m180000gn/T/fetchFileTemp1222383218969222619.tmp
>>  14/07/25 17:55:28 [DEBUG] Utils: fetchFile not using security
>>  14/07/25 17:55:28 [DEBUG] Shell: Failed to detect a valid hadoop home
>> directory
>>  java.io.IOException: HADOOP_HOME or hadoop.home.dir are not set.
>>  at org.apache.hadoop.util.Shell.checkHadoopHome(Shell.java:225)
>>  at org.apache.hadoop.util.Shell.<clinit>(Shell.java:250)
>> at org.apache.hadoop.fs.FileUtil.chmod(FileUtil.java:867)
>>  at org.apache.hadoop.fs.FileUtil.chmod(FileUtil.java:853)
>>  at org.apache.spark.util.Utils$.fetchFile(Utils.scala:421)
>> at
>> org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:330)
>>  at
>> org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:328)
>>  at
>> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
>>  at
>> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>>  at
>> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>>  at
>> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
>>  at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
>>  at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
>> at
>> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
>>  at org.apache.spark.executor.Executor.org
>> $apache$spark$executor$Executor$$updateDependencies(Executor.scala:328)
>>  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:164)
>>  at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>  at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>  at java.lang.Thread.run(Thread.java:722)
>> 14/07/25 17:55:28 [DEBUG] Shell: setsid is not available on this machine.
>> So not using it.
>>  14/07/25 17:55:28 [DEBUG] Shell: setsid exited with exit code 0
>>  14/07/25 17:55:28 [INFO] Executor: Adding
>> file:/var/folders/fq/fzcyqkcx7rgbycg4kr8z3m180000gn/T/spark-7cbbc9fb-1ed4-467e-b8ed-96824ab2e824/spark-examples-1.0.1-hadoop2.2.0.jar
>> to class loader
>>  14/07/25 17:55:28 [DEBUG] KryoSerializer: Running user registrator:
>> org.apache.spark.examples.streaming.SerializationRegistry
>>  14/07/25 17:55:28 [DEBUG] Executor: Task 0's epoch is 0
>>  14/07/25 17:55:28 [INFO] ReceiverSupervisorImpl: Registered receiver 0
>>  14/07/25 17:55:28 [INFO] RecurringTimer: Started timer for
>> BlockGenerator at time 1406336129000
>>  14/07/25 17:55:28 [INFO] BlockGenerator: Started BlockGenerator
>>  14/07/25 17:55:28 [INFO] BlockGenerator: Started block pushing thread
>>  14/07/25 17:55:28 [INFO] ReceiverSupervisorImpl: Starting receiver
>>  14/07/25 17:55:28 [INFO] ActorReceiver: Supervision tree for receivers
>> initialized at:akka://spark/user/Supervisor0
>>  14/07/25 17:55:28 [INFO] ReceiverSupervisorImpl: Called receiver onStart
>>  creating actor of worker now!!!!!!!!!!!!!!! akka.actor.ActorCell@26ec5d79,
>> class akka.actor.ActorCell
>> 14/07/25 17:55:28 [INFO] ReceiverTracker: Registered receiver for stream
>> 0 from akka://spark
>>  14/07/25 17:55:28 [INFO] ReceiverTracker: Registered receiver for stream
>> 0 from akka://spark
>>  14/07/25 17:55:28 [ERROR] OneForOneStrategy: configuration problem while
>> creating [akka://spark/user/Supervisor0/SampleReceiver] with dispatcher
>> [akka.actor.default-dispatcher] and mailbox [akka.actor.default-mailbox]
>>  akka.actor.ActorInitializationException: exception during creation
>>  at akka.actor.ActorInitializationException$.apply(Actor.scala:218)
>> at akka.actor.ActorCell.create(ActorCell.scala:578)
>>  at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:425)
>>  at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)
>> at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
>>  at akka.dispatch.Mailbox.run(Mailbox.scala:218)
>>  at
>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>>  at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> at
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>  at
>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>  at
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> Caused by: akka.ConfigurationException: configuration problem while
>> creating [akka://spark/user/Supervisor0/SampleReceiver] with dispatcher
>> [akka.actor.default-dispatcher] and mailbox [akka.actor.default-mailbox]
>>  at akka.actor.LocalActorRefProvider.actorOf(ActorRefProvider.scala:723)
>>  at
>> akka.remote.RemoteActorRefProvider.actorOf(RemoteActorRefProvider.scala:296)
>>  at akka.actor.dungeon.Children$class.makeChild(Children.scala:191)
>> at akka.actor.dungeon.Children$class.actorOf(Children.scala:38)
>>  at akka.actor.ActorCell.actorOf(ActorCell.scala:338)
>>  at
>> org.apache.spark.streaming.receiver.ActorReceiver$Supervisor.<init>(ActorReceiver.scala:152)
>>  at
>> org.apache.spark.streaming.receiver.ActorReceiver$$anonfun$supervisor$1.apply(ActorReceiver.scala:145)
>>  at
>> org.apache.spark.streaming.receiver.ActorReceiver$$anonfun$supervisor$1.apply(ActorReceiver.scala:145)
>>  at akka.actor.TypedCreatorFunctionConsumer.produce(Props.scala:401)
>> at akka.actor.Props.newActor(Props.scala:339)
>>  at akka.actor.ActorCell.newActor(ActorCell.scala:534)
>>  at akka.actor.ActorCell.create(ActorCell.scala:560)
>> ... 9 more
>> Caused by: java.lang.IllegalArgumentException: constructor public
>> akka.actor.CreatorFunctionConsumer(scala.Function0) is incompatible with
>> arguments [class java.lang.Class, class
>> org.apache.spark.examples.streaming.ActorWordCount$$anonfun$2]
>>  at akka.util.Reflect$.instantiate(Reflect.scala:69)
>>  at akka.actor.Props.cachedActorClass(Props.scala:203)
>> at akka.actor.Props.actorClass(Props.scala:327)
>>  at akka.dispatch.Mailboxes.getMailboxType(Mailboxes.scala:124)
>>  at akka.actor.LocalActorRefProvider.actorOf(ActorRefProvider.scala:718)
>> ... 20 more
>> Caused by: java.lang.IllegalArgumentException: wrong number of arguments
>>  at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>> at
>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
>>  at
>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>>  at java.lang.reflect.Constructor.newInstance(Constructor.java:525)
>>  at akka.util.Reflect$.instantiate(Reflect.scala:65)
>> ... 24 more
>> 14/07/25 17:55:29 [DEBUG] RecurringTimer: Callback for BlockGenerator
>> called at time 1406336129000
>>  14/07/25 17:55:29 [DEBUG] RecurringTimer: Callback for BlockGenerator
>> called at time 1406336129200
>>  14/07/25 17:55:29 [DEBUG] RecurringTimer: Callback for BlockGenerator
>> called at time 1406336129400
>>  14/07/25 17:55:29 [DEBUG] RecurringTimer: Callback for BlockGenerator
>> called at time 1406336129600
>>  14/07/25 17:55:29 [DEBUG] RecurringTimer: Callback for BlockGenerator
>> called at time 1406336129800
>>  14/07/25 17:55:30 [DEBUG] RecurringTimer: Callback for BlockGenerator
>> called at time 1406336130000
>>
>> On Jul 25, 2014, at 3:20 PM, Tathagata Das <tathagata.das1...@gmail.com>
>> wrote:
>>
>> Is this error on the executor or on the driver? Can you provide a larger
>> snippet of the logs, driver as well as if possible executor logs.
>>
>> TD
>>
>>
>> On Thu, Jul 24, 2014 at 10:28 PM, Alan Ngai <a...@opsclarity.com> wrote:
>>
>>> bump.  any ideas?
>>>
>>> On Jul 24, 2014, at 3:09 AM, Alan Ngai <a...@opsclarity.com> wrote:
>>>
>>> it looks like when you configure sparkconfig to use the kryoserializer
>>> in combination of using an ActorReceiver, bad things happen.  I modified
>>> the ActorWordCount example program from
>>>
>>>     val sparkConf = new SparkConf().setAppName("ActorWordCount")
>>>
>>> to
>>>
>>>     val sparkConf = new SparkConf()
>>>       .setAppName("ActorWordCount")
>>>       .set("spark.serializer",
>>> "org.apache.spark.serializer.KryoSerializer”)
>>>
>>> and I get the stack trace below.  I figured it might be that Kryo
>>> doesn’t know how to serialize/deserialize the actor so I added a registry.
>>>  I also added a default empty constructor to SampleActorReceiver just for
>>> kicks
>>>
>>> class SerializationRegistry extends KryoRegistrator {
>>>   override def registerClasses(kryo: Kryo) {
>>>     kryo.register(classOf[SampleActorReceiver])
>>>   }
>>> }
>>>
>>> …
>>>
>>> case class SampleActorReceiver[T: ClassTag](var urlOfPublisher: String)
>>> extends Actor with ActorHelper {
>>>   def this() = this(“”)
>>>   ...
>>> }
>>>
>>> ...
>>>     val sparkConf = new SparkConf()
>>>       .setAppName("ActorWordCount")
>>>       .set("spark.serializer",
>>> "org.apache.spark.serializer.KryoSerializer")
>>>       .set("spark.kryo.registrator",
>>> "org.apache.spark.examples.streaming.SerializationRegistry")
>>>
>>>
>>> None of this worked, same stack trace.  Any idea what’s going on?  Is
>>> this a known issue and is there a workaround?
>>>
>>> 14/07/24 02:58:19 [ERROR] OneForOneStrategy: configuration problem while
>>> creating [akka://spark/user/Supervisor0/SampleReceiver] with dispatcher
>>> [akka.actor.default-dispatcher] and mailbox [akka.actor.default-mailbox]
>>>  akka.actor.ActorInitializationException: exception during creation
>>>  at akka.actor.ActorInitializationException$.apply(Actor.scala:218)
>>> at akka.actor.ActorCell.create(ActorCell.scala:578)
>>>  at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:425)
>>>  at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)
>>> at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
>>>  at akka.dispatch.Mailbox.run(Mailbox.scala:218)
>>>  at
>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>>>  at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>> at
>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>  at
>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>  at
>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>> Caused by: akka.ConfigurationException: configuration problem while
>>> creating [akka://spark/user/Supervisor0/SampleReceiver] with dispatcher
>>> [akka.actor.default-dispatcher] and mailbox [akka.actor.default-mailbox]
>>>  at akka.actor.LocalActorRefProvider.actorOf(ActorRefProvider.scala:723)
>>>  at
>>> akka.remote.RemoteActorRefProvider.actorOf(RemoteActorRefProvider.scala:296)
>>>  at akka.actor.dungeon.Children$class.makeChild(Children.scala:191)
>>> at akka.actor.dungeon.Children$class.actorOf(Children.scala:38)
>>>  at akka.actor.ActorCell.actorOf(ActorCell.scala:338)
>>>  at
>>> org.apache.spark.streaming.receiver.ActorReceiver$Supervisor.<init>(ActorReceiver.scala:152)
>>>  at
>>> org.apache.spark.streaming.receiver.ActorReceiver$$anonfun$supervisor$1.apply(ActorReceiver.scala:145)
>>>  at
>>> org.apache.spark.streaming.receiver.ActorReceiver$$anonfun$supervisor$1.apply(ActorReceiver.scala:145)
>>>  at akka.actor.TypedCreatorFunctionConsumer.produce(Props.scala:401)
>>> at akka.actor.Props.newActor(Props.scala:339)
>>>  at akka.actor.ActorCell.newActor(ActorCell.scala:534)
>>>  at akka.actor.ActorCell.create(ActorCell.scala:560)
>>> ... 9 more
>>> Caused by: java.lang.IllegalArgumentException: constructor public
>>> akka.actor.CreatorFunctionConsumer(scala.Function0) is incompatible with
>>> arguments [class java.lang.Class, class
>>> org.apache.spark.examples.streaming.ActorWordCount$$anonfun$2]
>>>  at akka.util.Reflect$.instantiate(Reflect.scala:69)
>>>  at akka.actor.Props.cachedActorClass(Props.scala:203)
>>> at akka.actor.Props.actorClass(Props.scala:327)
>>>  at akka.dispatch.Mailboxes.getMailboxType(Mailboxes.scala:124)
>>>  at akka.actor.LocalActorRefProvider.actorOf(ActorRefProvider.scala:718)
>>> ... 20 more
>>> Caused by: java.lang.IllegalArgumentException: wrong number of arguments
>>>  at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
>>> Method)
>>> at
>>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
>>>  at
>>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>>>  at java.lang.reflect.Constructor.newInstance(Constructor.java:525)
>>>  at akka.util.Reflect$.instantiate(Reflect.scala:65)
>>> ... 24 more
>>>
>>>
>>>
>>
>>
>

Reply via email to