hi there, I'm trying to set up a simple spark streaming app using akka actors as receivers. I followed the example provided and created two apps. One creating an actor system and another one subscribing to it. I can see the subscription message but few seconds later i get an error
[info] 20:37:40.296 [INFO ] Slf4jLogger started [info] 20:37:40.466 [INFO ] Starting remoting [info] 20:37:40.871 [INFO ] Remoting started; listening on addresses :[akka.tcp://spark-engine@spark-engine:9083] [info] 20:37:40.876 [INFO ] Remoting now listens on addresses: [akka.tcp://spark-engine@spark-engine:9083] [info] 20:37:40.913 [INFO ] starting actor on akka://spark-engine/user/integrationActor [info] received subscribe from Actor[akka.tcp:// sparkExecutorActorSystem@172.18.0.2:6006/user/Supervisor0/TagsReceiver#536081036 ] [info] 20:38:34.125 [INFO ] No response from remote. Handshake timed out or transport failure detector triggered. [info] 20:38:34.226 [WARN ] Association with remote system [akka.tcp:// sparkExecutorActorSystem@172.18.0.2:6006] has failed, address is now gated for [5000] ms. Reason: [Disassociated] [info] received unsubscribe from Actor[akka.tcp:// sparkExecutorActorSystem@172.18.0.2:6006/user/Supervisor0/TagsReceiver#536081036 ] I'm running the master and worker on docker. The two apps are running on my laptop for testing. Here's the code of both def main(args: Array[String]) { val conf = new SparkConf() .setMaster(sparkMaster) .setAppName(sparkApp) .set("spark.logConf", "true") .set("spark.driver.port","7001") .set("spark.fileserver.port","6002") .set("spark.broadcast.port","6003") .set("spark.replClassServer.port","6004") .set("spark.blockManager.port","6005") .set("spark.executor.port","6006") .set("spark.akka.heartbeat.interval", "100") .set("spark.akka.logLifecycleEvents", "true") .set("spark.rpc.netty.dispatcher.numThreads","2") .setJars(sparkJars) val ssc = new StreamingContext(conf, Seconds(5)) ssc.checkpoint("/tmp") val tags = ssc.actorStream [Tuple2[UUID, Tuple4[Set[String], Int, Int, Int]]] (Props(new GifteeTagStreamingActor("akka.tcp://spark-engine@spark-engine:9083/user/integrationActor")), "TagsReceiver") tags.print() ssc.start() ssc.awaitTermination() } def main(args: Array[String]) { val config = ConfigFactory.load() val system = ActorSystem("spark-engine", config.getConfig("spark-engine")) val integrationActor = system.actorOf(Props(new IntegrationActor()), "integrationActor") log.info("starting actor on " + integrationActor.path) system.awaitTermination() } This is my config for the remote actor system to where spark subscribes spark-engine { akka { loggers = ["akka.event.slf4j.Slf4jLogger"] loglevel = DEBUG logging-filter = "akka.event.slf4j.Slf4jLoggingFilter" log-dead-letters = 10 log-dead-letters-during-shutdown = on actor { provider = "akka.remote.RemoteActorRefProvider" } remote { enabled-transports = ["akka.remote.netty.tcp"] log-remote-lifecycle-events = on netty.tcp { hostname = "spark-engine" port = 9083 } } } } These are the logs from the executor 16/03/15 20:47:36 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 16/03/15 20:48:12 WARN ReliableDeliverySupervisor: Association with remote system [akka.tcp://spark-engine@spark-engine:9083] has failed, address is now gated for [5000] ms. Reason: [Disassociated] Any idea why the two actor systems get disassociated ? Thank you very much in advanced. Best David