Hi, I'm consistently getting NullPointerExceptions when trying to use String val objects defined in my main application -- even for broadcast vals! I'm deploying on a standalone cluster with a master and 4 workers on the same machine, which is not the machine I'm submitting from.
The following example works in spark-shell, but does not when submitted to the cluster with spark-submit, and also does not work locally. Is there anything I can do to fix this? Do vals need to be explicitly synchronized for RDD operations? One workaround in would be to inline the vals, but the logic in my actual application doesn't allow for this. Thanks, Brandon. --- sbt-shell --master <my-server> val suffix = "-suffix" val l = sc.parallelize(List("a", "b", "c")) println(l.map(_+suffix).collect().mkString(",")) Result: a-suffix,b-suffix,c-suffix --- Standalone Cluster with `submit.sh` (my script below): TestApp.scala: package com.adobe.spark // Spark. import org.apache.spark.{SparkConf,SparkContext} import org.apache.spark.broadcast._ import org.apache.spark.SparkContext._ import org.apache.spark.storage.StorageLevel // Scala. import scala.collection.mutable.ArrayBuffer object TestApp extends App { val memory = "1g" val maxCores = "1" val conf = new SparkConf() .setMaster("spark://myserver:7077") //.setMaster("local[4]") .setAppName("ValError") .setSparkHome("/usr/local/spark-1.0.0") .setJars(Seq("/tmp/val-error.jar")) .set("spark.executor.memory", memory) .set("spark.cores.max", maxCores) val sc = new SparkContext(conf) val suffix = "-suffix" val l = sc.parallelize(List("a", "b", "c")) println(l.map(_+suffix).collect().mkString(",")) val suffix_bc = sc.broadcast(suffix) println(l.map(_+suffix_bc.value).collect().mkString(",")) sc.stop() } build.sbt: import AssemblyKeys._ assemblySettings jarName in assembly := "val-error.jar" // Load "provided" libraries with `sbt run`. run in Compile <<= Defaults.runTask( fullClasspath in Compile, mainClass in (Compile, run), runner in (Compile, run) ) name := "TestApp" version := "1.0" scalaVersion := "2.10.3" libraryDependencies ++= Seq( "org.apache.spark" %% "spark-core" % "1.0.0" % "provided", "org.slf4j" % "slf4j-simple" % "1.7.7" // Logging. ) resolvers ++= Seq( "Akka Repository" at "http://repo.akka.io/releases/" ) submit.sh: #!/bin/bash rm -f *.log driver-id.txt JAR=val-error.jar CLASS=com.adobe.spark.TestApp SPARK=/usr/local/spark-1.0.0 set -x sbt assembly &> assembly.log || exit 1 scp target/scala-2.10/$JAR eagle:/tmp || exit 2 $SPARK/bin/spark-submit \ --class $CLASS \ --master spark://myserver:7077 \ --deploy-mode cluster \ /tmp/$JAR | tee submit.log set +x DRIVER_ID=$(grep 'Driver successfully submitted' submit.log | sed 's/Driver successfully submitted as \(.*\)/\1/g') [ -z $DRIVER_ID ] && exit 3 echo $DRIVER_ID > driver-id.txt Output: anull,bnull,cnull (For the first part.) Stack Trace: (For the broadcast var.) Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1.0:0 failed 4 times, most recent failure: Exception failure in TID 8 on host eagle.corp.adobe.com: java.lang.NullPointerException com.adobe.spark.TestApp$$anonfun$2.apply(App.scala:29) com.adobe.spark.TestApp$$anonfun$2.apply(App.scala:29) scala.collection.Iterator$$anon$11.next(Iterator.scala:328) scala.collection.Iterator$class.foreach(Iterator.scala:727) scala.collection.AbstractIterator.foreach(Iterator.scala:1157) scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) scala.collection.AbstractIterator.to(Iterator.scala:1157) scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) scala.collection.AbstractIterator.toArray(Iterator.scala:1157) org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:717) org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:717) org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1080) org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1080) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) org.apache.spark.scheduler.Task.run(Task.scala:51) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:744) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) --- When using 'local[4]' and `sbt run` with `setJars` commented. This is happening when `collect()` is called on the first map. org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0:0 failed 1 times, most recent failure: Exception failure in TID 0 on host localhost: java.lang.ClassNotFoundException: scala.None$ java.net.URLClassLoader$1.run(URLClassLoader.java:372) java.net.URLClassLoader$1.run(URLClassLoader.java:361) java.security.AccessController.doPrivileged(Native Method) java.net.URLClassLoader.findClass(URLClassLoader.java:360) java.lang.ClassLoader.loadClass(ClassLoader.java:424) java.lang.ClassLoader.loadClass(ClassLoader.java:357) java.lang.Class.forName0(Native Method) java.lang.Class.forName(Class.java:340) org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:60) java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613) java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63) org.apache.spark.scheduler.ResultTask$.deserializeInfo(ResultTask.scala:61) org.apache.spark.scheduler.ResultTask.readExternal(ResultTask.scala:141) java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1840) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1799) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63) org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:85) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:169) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/NullPointerExceptions-when-using-val-or-broadcast-on-a-standalone-cluster-tp7524.html Sent from the Apache Spark User List mailing list archive at Nabble.com.