Dear all, I encountered NullPointerException running a simple program like below:
> val sparkconf = new SparkConf() > .setMaster(master) > .setAppName("myapp") > // and other setups > > val ssc = new StreamingContext(sparkconf, Seconds(30)) > val flume = new FlumeInputDStream(ssc, flume_sink_ip, flume_sink_port, > StorageLevel.MEMORY_AND_DISK_SER_2) > > val messages = flume.map(x => { > val charset = Charset.forName("UTF-8") > val decoder = charset.newDecoder() > val msg = decoder.decode(x.event.getBody()).toString() > msg > }) > > // "messages.count" does not throw NullPointerException > messages.count.foreachRDD(rdd => { > () > }) > messages.print() If calling "messages.count" or 'messages.count.map', no exception is thrown. If using 'messages.count.foreachRDD', NullPointerException is thrown. Console output snippets: > .... > 14/02/26 10:36:51 INFO storage.BlockManagerMasterActor$BlockManagerInfo: > Registering block manager node-005:36924 with 294.4 MB RAM > 14/02/26 10:37:00 ERROR scheduler.JobScheduler: Error generating jobs for > time 1393382220000 ms > java.lang.NullPointerException > at > org.apache.spark.streaming.dstream.DStream$$anonfun$count$3.apply(DStream.scala:487) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$count$3.apply(DStream.scala:487) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1.apply(DStream.scala:536) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1.apply(DStream.scala:536) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$4.apply(DStream.scala:547) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$4.apply(DStream.scala:545) > at > org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:41) > at > org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:292) > at > org.apache.spark.streaming.dstream.ShuffledDStream.compute(ShuffledDStream.scala:41) > at > org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:292) > at > org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35) > at > org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:292) > at > org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38) > at > org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:115) > at > org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:115) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) > at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) > at > org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:115) > at > org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:161) > at > org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:161) > at scala.util.Try$.apply(Try.scala:161) > at > org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:161) > at > org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:105) > at > org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$start$1$$anon$1$$anonfun$receive$1.applyOrElse(JobGenerator.scala:70) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) > at akka.actor.ActorCell.invoke(ActorCell.scala:456) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) > at akka.dispatch.Mailbox.run(Mailbox.scala:219) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > [error] (run-main-0) java.lang.NullPointerException > java.lang.NullPointerException > at > org.apache.spark.streaming.dstream.DStream$$anonfun$count$3.apply(DStream.scala:487) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$count$3.apply(DStream.scala:487) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1.apply(DStream.scala:536) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1.apply(DStream.scala:536) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$4.apply(DStream.scala:547) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$4.apply(DStream.scala:545) > at > org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:41) > at > org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:292) > at > org.apache.spark.streaming.dstream.ShuffledDStream.compute(ShuffledDStream.scala:41) > at > org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:292) > at > org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35) > at > org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:292) > at > org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38) > at > org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:115) > at > org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:115) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) > at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) > at > org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:115) > at > org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:161) > at > org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:161) > at scala.util.Try$.apply(Try.scala:161) > at > org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:161) > at > org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:105) > at > org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$start$1$$anon$1$$anonfun$receive$1.applyOrElse(JobGenerator.scala:70) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) > at akka.actor.ActorCell.invoke(ActorCell.scala:456) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) > at akka.dispatch.Mailbox.run(Mailbox.scala:219) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > [trace] Stack trace suppressed: run last *:runMain for the full output. What is wrong with doing 'foreachRDD' on a 'Count'-ed DStream? Thank you. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/NullPointerException-from-Count-on-DStream-tp2066.html Sent from the Apache Spark User List mailing list archive at Nabble.com.