Hi,
I’m trying to submit a streaming application on my standalone Spark cluster,
this is my code:
import akka.actor.{Props, ActorSystem, Actor}
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.HttpRequest
import akka.http.scaladsl.model.Uri
import akka.stream.ActorMaterializer
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.receiver.ActorHelper
import org.apache.spark.{SparkEnv, SparkContext, SparkConf}
object Boot extends App {
val driverPort = 7777
val conf = new SparkConf().
setAppName("TestSpark").
set("spark.driver.port", driverPort.toString).
set("spark.driver.host", "localhost").
setMaster("spark://<master-uri>:7077").
set("spark.akka.logLifecycleEvents", "true")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(1))
val meetup = ssc.actorStream[String](Props[CustomReceiver], "attore")
ChunkReceiver.start(driverPort, "localhost", "attore")
meetup.print
ssc.start
}
object ChunkReceiver {
def start(driverPort:Int, driverHost:String, actorName:String) {
Thread.sleep(5000)
implicit val system = ActorSystem("client")
import system.dispatcher
implicit val materializer = ActorMaterializer()
val url =
s"akka.tcp://sparkDriver@$driverHost:$driverPort/user/Supervisor0/$actorName"
val actor = SparkEnv.get.actorSystem.actorSelection(url)
val source = Uri("http://stream.meetup.com/2/rsvps")
Http().
singleRequest(HttpRequest(uri = source)).
flatMap { response =>
response.entity.dataBytes.runForeach { chunk =>
actor ! chunk.utf8String
}
}
}
}
class CustomReceiver extends Actor with ActorHelper {
log.info("Starting receiver ...")
override def receive = {
case s:String => store(s)
}
}
And these are the messages generate by Spark:
15/12/21 06:43:24 WARN MetricsSystem: Using default name DAGScheduler for
source because spark.app.id is not set.
15/12/21 06:43:26 ERROR ErrorMonitor: AssociationError
[akka.tcp://sparkDriver@localhost:7777] <-
[akka.tcp://driverPropsFetcher@<ip-cluster>:51397]: Error [Shut down
address: akka.tcp://driverPropsFetcher@<ip-cluster>:51397] [
akka.remote.ShutDownAssociation: Shut down address:
akka.tcp://driverPropsFetcher@<ip-cluster>:51397
Caused by: akka.remote.transport.Transport$InvalidAssociationException: The
remote system terminated the association because it is shutting down.
]
akka.event.Logging$Error$NoCause$
15/12/21 06:43:26 ERROR EndpointWriter: AssociationError
[akka.tcp://sparkDriver@localhost:7777] <-
[akka.tcp://driverPropsFetcher@<ip-cluster>:51397]: Error [Shut down
address: akka.tcp://driverPropsFetcher@<ip-cluster>:51397] [
akka.remote.ShutDownAssociation: Shut down address:
akka.tcp://driverPropsFetcher@<ip-cluster>:51397
Caused by: akka.remote.transport.Transport$InvalidAssociationException: The
remote system terminated the association because it is shutting down.
]
15/12/21 06:43:26 ERROR ErrorMonitor: AssociationError
[akka.tcp://sparkDriver@localhost:7777] <-
[akka.tcp://driverPropsFetcher@<ip-cluster>:51463]: Error [Shut down
address: akka.tcp://driverPropsFetcher@<ip-cluster>:51463] [
akka.remote.ShutDownAssociation: Shut down address:
akka.tcp://driverPropsFetcher@<ip-cluster>:51463
Caused by: akka.remote.transport.Transport$InvalidAssociationException: The
remote system terminated the association because it is shutting down.
]
akka.event.Logging$Error$NoCause$
15/12/21 06:43:26 ERROR EndpointWriter: AssociationError
[akka.tcp://sparkDriver@localhost:7777] <-
[akka.tcp://driverPropsFetcher@<ip-cluster>:51463]: Error [Shut down
address: akka.tcp://driverPropsFetcher@<ip-cluster>:51463] [
akka.remote.ShutDownAssociation: Shut down address:
akka.tcp://driverPropsFetcher@<ip-cluster>:51463
Caused by: akka.remote.transport.Transport$InvalidAssociationException: The
remote system terminated the association because it is shutting down.
]
15/12/21 06:43:26 ERROR ErrorMonitor: AssociationError
[akka.tcp://sparkDriver@localhost:7777] <-
[akka.tcp://driverPropsFetcher@<ip-cluster>:59128]: Error [Shut down
address: akka.tcp://driverPropsFetcher@<ip-cluster>:59128] [
akka.remote.ShutDownAssociation: Shut down address:
akka.tcp://driverPropsFetcher@<ip-cluster>:59128
Caused by: akka.remote.transport.Transport$InvalidAssociationException: The
remote system terminated the association because it is shutting down.
]
akka.event.Logging$Error$NoCause$
15/12/21 06:43:26 ERROR EndpointWriter: AssociationError
[akka.tcp://sparkDriver@localhost:7777] <-
[akka.tcp://driverPropsFetcher@<ip-cluster>:59128]: Error [Shut down
address: akka.tcp://driverPropsFetcher@<ip-cluster>:59128] [
akka.remote.ShutDownAssociation: Shut down address:
akka.tcp://driverPropsFetcher@<ip-cluster>:59128
Caused by: akka.remote.transport.Transport$InvalidAssociationException: The
remote system terminated the association because it is shutting down.
]
the application doesn’t terminate with errors, but the CustomReceiver
doesn’t receive any data.
Thank you very much for the help,
Luca Guerra
--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/Problem-with-Spark-Standalone-tp25750.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]