Hello,

I am trying to launch a Spark app(client mode for standalone cluster) from
a Spray server, using the following code.

When I run it as

$> java -cp <class paths> SprayServer

the SimpleApp.getA() call from  SprayService returns -1(which means it sees
the logData RDD as null for HTTP requests), but the statements from within
SimpleAppLoader.run() get correct values from SimpleApp.getA().

Any idea why the HTTP requests do not see the cached RDD? I have been
trying to debug this for some time but not getting anywhere - any pointers
will be greatly appreciated.

Thanks.

//////////////////////// BEGIN SPRAY SERVER

import akka.actor.{ActorSystem, Props}



import akka.io.IO



import spray.can.Http







import akka.actor._



import spray.routing.HttpService



import scala.concurrent.ops







object SprayServer {



  def main(args: Array[String]) {



    // we need an ActorSystem to host our service



    implicit val system = ActorSystem()







    //create our service actor



    val service = system.actorOf(Props[SprayServiceActor], "test-service")







    //bind our actor to an HTTP port



    IO(Http) ! Http.Bind(service, interface = "0.0.0.0", port = 8085)







    ops.spawn {



      *SimpleAppLoader.run() *



    }



  }



}







class SprayServiceActor extends SprayService with Actor {



  // the HttpService trait (which SprayService will extend) defines



  // only one abstract member, which connects the services environment



  // to the enclosing actor or test.



  def actorRefFactory = context







  def receive = runRoute(rootRoute)



}







trait SprayService extends HttpService {







  def default = path("") {



    println("handling default route")



    val numAs = *SimpleApp.getA()   // DOES NOT WORK   *




    get { complete(s"num A: $numAs") }



  }







  def pingRoute = path("ping") {



    get { complete("pong!") }



  }







  def pongRoute = path("pong") {



    get { complete("pong!?") }



  }







  def rootRoute = pingRoute ~ pongRoute ~ default



}







////////// END SPRAY, BEGIN SPARK







import org.apache.spark.SparkContext



import org.apache.spark.SparkContext._



import org.apache.spark.SparkConf



import org.apache.spark.deploy.SparkSubmit



import org.apache.spark.rdd.RDD







object SimpleApp {



  var resultString: String = "Data not assigned"



  var logData: RDD[String] = null



  def main(args: Array[String]) {



    val logFile = "/home/ovik/src/spark/README.md" // Should be some file
on your system


    val conf = new SparkConf().setAppName("Simple Application")



    val sc = new SparkContext(conf)



    logData = sc.textFile(logFile, 2).cache()



    val numAs = logData.filter(line => line.contains("a")).count()



    val numBs = logData.filter(line => line.contains("b")).count()



    resultString = "Lines with a: %s, Lines with b: %s".format(numAs,
numBs)


    println(resultString)



  }



  def getA(): Int = {



    println(resultString)



    if(null == logData) {



      println("**** logData is null!")



      -1



    } else {



      val numAs = logData.filter(line => line.contains("a")).count().toInt



      println(s"**** numAs: $numAs")



      numAs



    }



  }



}







object SimpleAppLoader {



  def main(args: Array[String]) {



    run()



  }







  def run() {







    val clArgs = Array(



      "--deploy-mode", "client"



      , "--total-executor-cores", "2"



      , "--class", "SimpleApp"



      , "--conf", "spark.shuffle.spill=false"



      , "--conf", "spark.master=spark://troika:7077"



      , "--conf", "spark.driver.memory=128m"



      , "--conf", "spark.executor.memory=128m"



      , "--conf", "spark.eventLog.enabled=true"



      , "--conf", "spark.eventLog.dir=/home/ovik/logs"



      , SparkContext.jarOfClass(this.getClass).get)







    SparkSubmit.main(clArgs)







    val numAs = *SimpleApp.getA()    // WORKS     *







    println(s"numAs is $numAs")



  }



}

Reply via email to