Solved the issue by setting up the same heartbeat interval and pauses in
both actor systems
akka {
loggers = ["akka.event.slf4j.Slf4jLogger"]
loglevel = DEBUG
logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
log-dead-letters = on
log-dead-letters-during-shutdown = on
daemonic = on
jvm-exit-on-fatal-error = off
actor {
provider = "akka.remote.RemoteActorRefProvider"
default-dispatcher.throughput = 15
}
remote {
enabled-transports = ["akka.remote.netty.tcp"]
log-remote-lifecycle-events = on
require-cookie = off
secure-cookie = off
netty.tcp {
hostname = "spark-engine"
port = 9083
tcp-nodelay = on
transport-class = "akka.remote.transport.netty.NettyTransport"
connection-timeout = 120 s
execution-pool-size = 4
}
transport-failure-detector {
heartbeat-interval = 4 s
acceptable-heartbeat-pause = 16 s
}
}
}
.set("spark.akka.heartbeat.interval", "4s")
.set("spark.akka.heartbeat.pauses", "16s")
On Tue, Mar 15, 2016 at 9:50 PM, David Gomez Saavedra <[email protected]>
wrote:
> hi there,
>
> I'm trying to set up a simple spark streaming app using akka actors as
> receivers. I followed the example provided and created two apps. One
> creating an actor system and another one subscribing to it. I can see the
> subscription message but few seconds later i get an error
>
> [info] 20:37:40.296 [INFO ] Slf4jLogger started
> [info] 20:37:40.466 [INFO ] Starting remoting
> [info] 20:37:40.871 [INFO ] Remoting started; listening on addresses
> :[akka.tcp://spark-engine@spark-engine:9083]
> [info] 20:37:40.876 [INFO ] Remoting now listens on addresses:
> [akka.tcp://spark-engine@spark-engine:9083]
> [info] 20:37:40.913 [INFO ] starting actor on
> akka://spark-engine/user/integrationActor
> [info] received subscribe from Actor[akka.tcp://
> [email protected]:6006/user/Supervisor0/TagsReceiver#536081036
> ]
> [info] 20:38:34.125 [INFO ] No response from remote. Handshake timed out
> or transport failure detector triggered.
> [info] 20:38:34.226 [WARN ] Association with remote system [akka.tcp://
> [email protected]:6006] has failed, address is now
> gated for [5000] ms. Reason: [Disassociated]
> [info] received unsubscribe from Actor[akka.tcp://
> [email protected]:6006/user/Supervisor0/TagsReceiver#536081036
> ]
>
> I'm running the master and worker on docker. The two apps are running on
> my laptop for testing. Here's the code of both
>
> def main(args: Array[String]) {
> val conf = new SparkConf()
> .setMaster(sparkMaster)
> .setAppName(sparkApp)
> .set("spark.logConf", "true")
> .set("spark.driver.port","7001")
> .set("spark.fileserver.port","6002")
> .set("spark.broadcast.port","6003")
> .set("spark.replClassServer.port","6004")
> .set("spark.blockManager.port","6005")
> .set("spark.executor.port","6006")
> .set("spark.akka.heartbeat.interval", "100")
> .set("spark.akka.logLifecycleEvents", "true")
> .set("spark.rpc.netty.dispatcher.numThreads","2")
> .setJars(sparkJars)
>
>
> val ssc = new StreamingContext(conf, Seconds(5))
>
> ssc.checkpoint("/tmp")
>
> val tags = ssc.actorStream [Tuple2[UUID, Tuple4[Set[String], Int, Int,
> Int]]] (Props(new
> GifteeTagStreamingActor("akka.tcp://spark-engine@spark-engine:9083/user/integrationActor")),
> "TagsReceiver")
>
> tags.print()
>
> ssc.start()
> ssc.awaitTermination()
>
> }
>
>
>
> def main(args: Array[String]) {
>
> val config = ConfigFactory.load()
> val system = ActorSystem("spark-engine", config.getConfig("spark-engine"))
>
> val integrationActor = system.actorOf(Props(new IntegrationActor()),
> "integrationActor")
>
> log.info("starting actor on " + integrationActor.path)
>
> system.awaitTermination()
>
> }
>
>
> This is my config for the remote actor system to where spark subscribes
>
> spark-engine {
>
> akka {
> loggers = ["akka.event.slf4j.Slf4jLogger"]
> loglevel = DEBUG
> logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
> log-dead-letters = 10
> log-dead-letters-during-shutdown = on
>
> actor {
> provider = "akka.remote.RemoteActorRefProvider"
> }
> remote {
> enabled-transports = ["akka.remote.netty.tcp"]
> log-remote-lifecycle-events = on
> netty.tcp {
> hostname = "spark-engine"
> port = 9083
> }
> }
> }
> }
>
> These are the logs from the executor
>
> 16/03/15 20:47:36 WARN NativeCodeLoader: Unable to load native-hadoop library
> for your platform... using builtin-java classes where applicable
> 16/03/15 20:48:12 WARN ReliableDeliverySupervisor: Association with remote
> system [akka.tcp://spark-engine@spark-engine:9083] has failed, address is now
> gated for [5000] ms. Reason: [Disassociated]
>
>
>
> Any idea why the two actor systems get disassociated ?
>
> Thank you very much in advanced.
>
> Best
> David
>