That stack trace is quite similar to the one that is generated when trying to do a collect within a closure. In this case, it feels "wrong" to collect in a closure, but I wonder what's reason behind the NPE. Curious to know whether they are related.
Here's a very simple example: rrd1.flatMap(x=> rrd2.collect.flatMap(y=> List(y,x))) res7: org.apache.spark.rdd.RDD[Int] = FlatMappedRDD[10] at flatMap at <console>:17 scala> res7.collect 14/06/13 01:11:48 INFO SparkContext: Starting job: collect at <console>:19 14/06/13 01:11:48 INFO DAGScheduler: Got job 2 (collect at <console>:19) with 3 output partitions (allowLocal=false) 14/06/13 01:11:48 INFO DAGScheduler: Final stage: Stage 4(collect at <console>:19) 14/06/13 01:11:48 INFO DAGScheduler: Parents of final stage: List() 14/06/13 01:11:48 INFO DAGScheduler: Missing parents: List() 14/06/13 01:11:48 INFO DAGScheduler: Submitting Stage 4 (FlatMappedRDD[10] at flatMap at <console>:17), which has no missing parents 14/06/13 01:11:48 INFO DAGScheduler: Submitting 3 missing tasks from Stage 4 (FlatMappedRDD[10] at flatMap at <console>:17) 14/06/13 01:11:48 INFO TaskSchedulerImpl: Adding task set 4.0 with 3 tasks 14/06/13 01:11:48 INFO TaskSetManager: Starting task 4.0:0 as TID 16 on executor localhost: localhost (PROCESS_LOCAL) 14/06/13 01:11:48 INFO TaskSetManager: Serialized task 4.0:0 as 1850 bytes in 0 ms 14/06/13 01:11:48 INFO TaskSetManager: Starting task 4.0:1 as TID 17 on executor localhost: localhost (PROCESS_LOCAL) 14/06/13 01:11:48 INFO TaskSetManager: Serialized task 4.0:1 as 1850 bytes in 0 ms 14/06/13 01:11:48 INFO TaskSetManager: Starting task 4.0:2 as TID 18 on executor localhost: localhost (PROCESS_LOCAL) 14/06/13 01:11:48 INFO TaskSetManager: Serialized task 4.0:2 as 1850 bytes in 0 ms 14/06/13 01:11:48 INFO Executor: Running task ID 16 14/06/13 01:11:48 INFO Executor: Running task ID 17 14/06/13 01:11:48 INFO Executor: Running task ID 18 14/06/13 01:11:48 ERROR Executor: Exception in task ID 18 java.lang.NullPointerException at org.apache.spark.rdd.RDD.collect(RDD.scala:728) at $line45.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:17) at $line45.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:17) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:728) at org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:728) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1079) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1079) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) at org.apache.spark.scheduler.Task.run(Task.scala:51) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) 14/06/13 01:11:48 ERROR Executor: Exception in task ID 16 ... same for each partition. Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1037) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1021) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1019) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1019) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:637) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:637) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:637) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1211) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) -kr, Gerard. On Fri, Jun 13, 2014 at 12:32 AM, bdamos <[email protected]> wrote: > Hi, I'm consistently getting NullPointerExceptions when trying to use > String val objects defined in my main application -- even for broadcast > vals! > I'm deploying on a standalone cluster with a master and 4 workers on the > same machine, which is not the machine I'm submitting from. > > The following example works in spark-shell, but does not when > submitted to the cluster with spark-submit, and also does not work locally. > > Is there anything I can do to fix this? > Do vals need to be explicitly synchronized for RDD operations? > One workaround in would be to inline the vals, > but the logic in my actual application doesn't allow for this. > > Thanks, > Brandon. > > --- > > sbt-shell --master <my-server> > > val suffix = "-suffix" > val l = sc.parallelize(List("a", "b", "c")) > println(l.map(_+suffix).collect().mkString(",")) > > Result: a-suffix,b-suffix,c-suffix > > --- > > Standalone Cluster with `submit.sh` (my script below): > > TestApp.scala: > > package com.adobe.spark > > // Spark. > import org.apache.spark.{SparkConf,SparkContext} > import org.apache.spark.broadcast._ > import org.apache.spark.SparkContext._ > import org.apache.spark.storage.StorageLevel > > // Scala. > import scala.collection.mutable.ArrayBuffer > > object TestApp extends App { > val memory = "1g" > val maxCores = "1" > val conf = new SparkConf() > .setMaster("spark://myserver:7077") > //.setMaster("local[4]") > .setAppName("ValError") > .setSparkHome("/usr/local/spark-1.0.0") > .setJars(Seq("/tmp/val-error.jar")) > .set("spark.executor.memory", memory) > .set("spark.cores.max", maxCores) > val sc = new SparkContext(conf) > > val suffix = "-suffix" > val l = sc.parallelize(List("a", "b", "c")) > println(l.map(_+suffix).collect().mkString(",")) > > val suffix_bc = sc.broadcast(suffix) > println(l.map(_+suffix_bc.value).collect().mkString(",")) > > sc.stop() > } > > build.sbt: > > import AssemblyKeys._ > > assemblySettings > > jarName in assembly := "val-error.jar" > > // Load "provided" libraries with `sbt run`. > run in Compile <<= Defaults.runTask( > fullClasspath in Compile, mainClass in (Compile, run), runner in > (Compile, run) > ) > > name := "TestApp" > > version := "1.0" > > scalaVersion := "2.10.3" > > libraryDependencies ++= Seq( > "org.apache.spark" %% "spark-core" % "1.0.0" % "provided", > "org.slf4j" % "slf4j-simple" % "1.7.7" // Logging. > ) > > resolvers ++= Seq( > "Akka Repository" at "http://repo.akka.io/releases/" > ) > > submit.sh: > > #!/bin/bash > > rm -f *.log driver-id.txt > > JAR=val-error.jar > CLASS=com.adobe.spark.TestApp > SPARK=/usr/local/spark-1.0.0 > > set -x > sbt assembly &> assembly.log || exit 1 > scp target/scala-2.10/$JAR eagle:/tmp || exit 2 > > $SPARK/bin/spark-submit \ > --class $CLASS \ > --master spark://myserver:7077 \ > --deploy-mode cluster \ > /tmp/$JAR | tee submit.log > set +x > > DRIVER_ID=$(grep 'Driver successfully submitted' submit.log | sed > 's/Driver successfully submitted as \(.*\)/\1/g') > [ -z $DRIVER_ID ] && exit 3 > echo $DRIVER_ID > driver-id.txt > > Output: > anull,bnull,cnull (For the first part.) > > Stack Trace: (For the broadcast var.) > Caused by: org.apache.spark.SparkException: Job aborted due to stage > failure: Task 1.0:0 failed 4 times, most recent failure: Exception failure > in TID 8 on host eagle.corp.adobe.com: java.lang.NullPointerException > com.adobe.spark.TestApp$$anonfun$2.apply(App.scala:29) > com.adobe.spark.TestApp$$anonfun$2.apply(App.scala:29) > scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > scala.collection.Iterator$class.foreach(Iterator.scala:727) > scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > > scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) > > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) > > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) > > scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) > scala.collection.AbstractIterator.to(Iterator.scala:1157) > > scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) > scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) > > scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) > scala.collection.AbstractIterator.toArray(Iterator.scala:1157) > org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:717) > org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:717) > > > org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1080) > > > org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1080) > > org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) > org.apache.spark.scheduler.Task.run(Task.scala:51) > > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187) > > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > java.lang.Thread.run(Thread.java:744) > Driver stacktrace: > at > org.apache.spark.scheduler.DAGScheduler.org > $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033) > at > > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017) > at > > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015) > at > > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015) > at > > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633) > at > > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633) > at scala.Option.foreach(Option.scala:236) > at > > org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633) > at > > org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) > at akka.actor.ActorCell.invoke(ActorCell.scala:456) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) > at akka.dispatch.Mailbox.run(Mailbox.scala:219) > at > > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > > --- > > When using 'local[4]' and `sbt run` with `setJars` commented. > This is happening when `collect()` is called on the first map. > > org.apache.spark.SparkException: Job aborted due to stage failure: Task > 0.0:0 failed 1 times, most recent failure: Exception failure in TID 0 on > host localhost: java.lang.ClassNotFoundException: scala.None$ > java.net.URLClassLoader$1.run(URLClassLoader.java:372) > java.net.URLClassLoader$1.run(URLClassLoader.java:361) > java.security.AccessController.doPrivileged(Native Method) > java.net.URLClassLoader.findClass(URLClassLoader.java:360) > java.lang.ClassLoader.loadClass(ClassLoader.java:424) > java.lang.ClassLoader.loadClass(ClassLoader.java:357) > java.lang.Class.forName0(Native Method) > java.lang.Class.forName(Class.java:340) > > > org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:60) > > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613) > > java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518) > > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774) > > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) > > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) > > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) > > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) > > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) > java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) > > > org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63) > > org.apache.spark.scheduler.ResultTask$.deserializeInfo(ResultTask.scala:61) > > org.apache.spark.scheduler.ResultTask.readExternal(ResultTask.scala:141) > > java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1840) > > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1799) > > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) > java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) > > > org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63) > > > org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:85) > > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:169) > > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > java.lang.Thread.run(Thread.java:745) > Driver stacktrace: > at > org.apache.spark.scheduler.DAGScheduler.org > $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033) > at > > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017) > at > > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015) > at > > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015) > at > > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633) > at > > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633) > at scala.Option.foreach(Option.scala:236) > at > > org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633) > at > > org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) > at akka.actor.ActorCell.invoke(ActorCell.scala:456) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) > at akka.dispatch.Mailbox.run(Mailbox.scala:219) > at > > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/NullPointerExceptions-when-using-val-or-broadcast-on-a-standalone-cluster-tp7524.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. >
