Thank you Tathagata and Therry for your response. You guys were absolutely correct that I created a dummy Dstream (to prevent Flume channel filling up) and counted the messages but I didn't output(print), hence is why it reported that error. Since I called print(), the error is no longer is being thrown.
Cheers, Uthay On 25 September 2015 at 03:40, Terry Hoo <hujie.ea...@gmail.com> wrote: > I met this before: in my program, some DStreams are not initialized since > they are not in the path of of output. > > You can check if you are the same case. > > > Thanks! > - Terry > > On Fri, Sep 25, 2015 at 10:22 AM, Tathagata Das <t...@databricks.com> > wrote: > >> Are you by any chance setting DStream.remember() with null? >> >> On Thu, Sep 24, 2015 at 5:02 PM, Uthayan Suthakar < >> uthayan.sutha...@gmail.com> wrote: >> >>> Hello all, >>> >>> My Stream job is throwing below exception at every interval. It is first >>> deleting the the checkpoint file and then it's trying to checkpoint, is >>> this normal behaviour? I'm using Spark 1.3.0. Do you know what may cause >>> this issue? >>> >>> 15/09/24 16:35:55 INFO scheduler.TaskSetManager: Finished task 1.0 in >>> stage 84.0 (TID 799) in 12 ms on itrac1511.cern.ch (1/8) >>> *15/09/24 16:35:55 INFO streaming.CheckpointWriter: >>> Deleting >>> hdfs://p01001532067275/user/wdtmon/wdt-dstream-44446/checkpoint-1443104220000* >>> *15/09/24 16:35:55 INFO streaming.CheckpointWriter: Checkpoint for time >>> 1443104220000 ms saved to file >>> 'hdfs://p01001532067275/user/wdtmon/wdt-dstream-44446/* >>> checkpoint-1443104220000', took 10696 bytes and 108 ms >>> 15/09/24 16:35:55 INFO streaming.DStreamGraph: Clearing checkpoint data >>> for time 1443104220000 ms >>> 15/09/24 16:35:55 INFO streaming.DStreamGraph: Cleared checkpoint data >>> for time 1443104220000 ms >>> 15/09/24 16:35:55 ERROR actor.OneForOneStrategy: >>> java.lang.NullPointerException >>> at >>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:168) >>> at >>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:168) >>> at >>> scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225) >>> at >>> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51) >>> at >>> scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68) >>> at >>> scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47) >>> at >>> scala.collection.TraversableOnce$class.maxBy(TraversableOnce.scala:225) >>> at >>> scala.collection.AbstractTraversable.maxBy(Traversable.scala:105) >>> at >>> org.apache.spark.streaming.DStreamGraph.getMaxInputStreamRememberDuration(DStreamGraph.scala:168) >>> at >>> org.apache.spark.streaming.scheduler.JobGenerator.clearCheckpointData(JobGenerator.scala:279) >>> at org.apache.spark.streaming.scheduler.JobGenerator.org >>> $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:181) >>> at >>> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$start$1$$anon$1$$anonfun$receive$1.applyOrElse(JobGenerator.scala:86) >>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) >>> at akka.actor.ActorCell.invoke(ActorCell.scala:456) >>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) >>> at akka.dispatch.Mailbox.run(Mailbox.scala:219) >>> at >>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) >>> at >>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>> at >>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >>> at >>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>> at >>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>> >>> >>> Cheers, >>> >>> Uthay >>> >>> >> >