Hi, I'm consistently getting NullPointerExceptions when trying to use
String val objects defined in my main application -- even for broadcast
vals!
I'm deploying on a standalone cluster with a master and 4 workers on the
same machine, which is not the machine I'm submitting from.

The following example works in spark-shell, but does not when
submitted to the cluster with spark-submit, and also does not work locally.

Is there anything I can do to fix this?
Do vals need to be explicitly synchronized for RDD operations?
One workaround in would be to inline the vals,
but the logic in my actual application doesn't allow for this.

Thanks,
Brandon.

---

sbt-shell --master <my-server>

  val suffix = "-suffix"
  val l = sc.parallelize(List("a", "b", "c"))
  println(l.map(_+suffix).collect().mkString(","))

  Result: a-suffix,b-suffix,c-suffix

---

Standalone Cluster with `submit.sh` (my script below):

TestApp.scala:

  package com.adobe.spark

  // Spark.
  import org.apache.spark.{SparkConf,SparkContext}
  import org.apache.spark.broadcast._
  import org.apache.spark.SparkContext._
  import org.apache.spark.storage.StorageLevel

  // Scala.
  import scala.collection.mutable.ArrayBuffer

  object TestApp extends App {
    val memory = "1g"
    val maxCores = "1"
    val conf = new SparkConf()
      .setMaster("spark://myserver:7077")
      //.setMaster("local[4]")
      .setAppName("ValError")
      .setSparkHome("/usr/local/spark-1.0.0")
      .setJars(Seq("/tmp/val-error.jar"))
      .set("spark.executor.memory", memory)
      .set("spark.cores.max", maxCores)
    val sc = new SparkContext(conf)

    val suffix = "-suffix"
    val l = sc.parallelize(List("a", "b", "c"))
    println(l.map(_+suffix).collect().mkString(","))

    val suffix_bc = sc.broadcast(suffix)
    println(l.map(_+suffix_bc.value).collect().mkString(","))

    sc.stop()
  }

build.sbt:

  import AssemblyKeys._

  assemblySettings

  jarName in assembly := "val-error.jar"

  // Load "provided" libraries with `sbt run`.
  run in Compile <<= Defaults.runTask(
    fullClasspath in Compile, mainClass in (Compile, run), runner in
(Compile, run)
  )

  name := "TestApp"

  version := "1.0"

  scalaVersion := "2.10.3"

  libraryDependencies ++= Seq(
    "org.apache.spark" %% "spark-core" % "1.0.0" % "provided",
    "org.slf4j" % "slf4j-simple" % "1.7.7" // Logging.
  )

  resolvers ++= Seq(
    "Akka Repository" at "http://repo.akka.io/releases/";
  )

submit.sh:

  #!/bin/bash

  rm -f *.log driver-id.txt

  JAR=val-error.jar
  CLASS=com.adobe.spark.TestApp
  SPARK=/usr/local/spark-1.0.0

  set -x
  sbt assembly &> assembly.log || exit 1
  scp target/scala-2.10/$JAR eagle:/tmp || exit 2

  $SPARK/bin/spark-submit \
    --class $CLASS \
    --master spark://myserver:7077 \
    --deploy-mode cluster \
    /tmp/$JAR | tee submit.log
  set +x

  DRIVER_ID=$(grep 'Driver successfully submitted' submit.log | sed
's/Driver successfully submitted as \(.*\)/\1/g')
  [ -z $DRIVER_ID ] && exit 3
  echo $DRIVER_ID > driver-id.txt

Output:
  anull,bnull,cnull (For the first part.)

Stack Trace: (For the broadcast var.)
  Caused by: org.apache.spark.SparkException: Job aborted due to stage
failure: Task 1.0:0 failed 4 times, most recent failure: Exception failure
in TID 8 on host eagle.corp.adobe.com: java.lang.NullPointerException
          com.adobe.spark.TestApp$$anonfun$2.apply(App.scala:29)
          com.adobe.spark.TestApp$$anonfun$2.apply(App.scala:29)
          scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
          scala.collection.Iterator$class.foreach(Iterator.scala:727)
          scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
         
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
         
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
         
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
         
scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
          scala.collection.AbstractIterator.to(Iterator.scala:1157)
         
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
          scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
         
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
          scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
          org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:717)
          org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:717)
         
org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1080)
         
org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1080)
         
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
          org.apache.spark.scheduler.Task.run(Task.scala:51)
         
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)
         
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
         
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
          java.lang.Thread.run(Thread.java:744)
  Driver stacktrace:
    at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
    at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
    at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
    at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
    at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
    at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
    at scala.Option.foreach(Option.scala:236)
    at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633)
    at
org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
    at akka.actor.ActorCell.invoke(ActorCell.scala:456)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
    at akka.dispatch.Mailbox.run(Mailbox.scala:219)
    at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

---

When using 'local[4]' and `sbt run` with `setJars` commented.
This is happening when `collect()` is called on the first map.

  org.apache.spark.SparkException: Job aborted due to stage failure: Task
0.0:0 failed 1 times, most recent failure: Exception failure in TID 0 on
host localhost: java.lang.ClassNotFoundException: scala.None$
          java.net.URLClassLoader$1.run(URLClassLoader.java:372)
          java.net.URLClassLoader$1.run(URLClassLoader.java:361)
          java.security.AccessController.doPrivileged(Native Method)
          java.net.URLClassLoader.findClass(URLClassLoader.java:360)
          java.lang.ClassLoader.loadClass(ClassLoader.java:424)
          java.lang.ClassLoader.loadClass(ClassLoader.java:357)
          java.lang.Class.forName0(Native Method)
          java.lang.Class.forName(Class.java:340)
         
org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:60)
         
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
         
java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
         
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
          java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
         
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
         
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
         
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
          java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
          java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
         
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63)
         
org.apache.spark.scheduler.ResultTask$.deserializeInfo(ResultTask.scala:61)
         
org.apache.spark.scheduler.ResultTask.readExternal(ResultTask.scala:141)
         
java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1840)
         
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1799)
          java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
          java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
         
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63)
         
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:85)
         
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:169)
         
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
         
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
          java.lang.Thread.run(Thread.java:745)
  Driver stacktrace:
    at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
    at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
    at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
    at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
    at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
    at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
    at scala.Option.foreach(Option.scala:236)
    at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633)
    at
org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
    at akka.actor.ActorCell.invoke(ActorCell.scala:456)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
    at akka.dispatch.Mailbox.run(Mailbox.scala:219)
    at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/NullPointerExceptions-when-using-val-or-broadcast-on-a-standalone-cluster-tp7524.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Reply via email to