Jacek Laskowski created SPARK-13316:
---------------------------------------

             Summary: "SparkException: DStream has not been initialized" when 
restoring StreamingContext from checkpoint and the dstream is created afterwards
                 Key: SPARK-13316
                 URL: https://issues.apache.org/jira/browse/SPARK-13316
             Project: Spark
          Issue Type: Bug
          Components: Streaming
    Affects Versions: 2.0.0
            Reporter: Jacek Laskowski


I faced the issue today but [it was already reported on 
SO|http://stackoverflow.com/q/35090180/1305344] a couple of days ago and the 
reason is that a dstream is registered after a StreamingContext has been 
recreated from checkpoint.

It _appears_ that...no dstreams must be registered after a StreamingContext has 
been recreated from checkpoint. It is *not* obvious at first.

The code:

{code}
def createStreamingContext(): StreamingContext = {
    val ssc = new StreamingContext(sparkConf, Duration(1000))
    ssc.checkpoint(checkpointDir)
    ssc
}
val ssc = StreamingContext.getOrCreate(checkpointDir), createStreamingContext)

val socketStream = ssc.socketTextStream(...)
socketStream.checkpoint(Seconds(1))
socketStream.foreachRDD(...)
{code}

It should be described in docs at the very least and/or checked in the code 
when the streaming computation starts.

The exception is as follows:

{code}
org.apache.spark.SparkException: 
org.apache.spark.streaming.dstream.ConstantInputDStream@724797ab has not been 
initialized
  at org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:311)
  at 
org.apache.spark.streaming.dstream.InputDStream.isTimeValid(InputDStream.scala:89)
  at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:332)
  at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:332)
  at scala.Option.orElse(Option.scala:289)
  at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:329)
  at 
org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
  at 
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:117)
  at 
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
  at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:252)
  at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:252)
  at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
  at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:252)
  at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
  at 
org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
  at 
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:233)
  at 
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:228)
  at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
  at 
org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:228)
  at 
org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:97)
  at 
org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:83)
  at 
org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply$mcV$sp(StreamingContext.scala:589)
  at 
org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:585)
  at 
org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:585)
  at ... run in separate thread using org.apache.spark.util.ThreadUtils ... ()
  at 
org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:585)
  at 
org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:579)
  ... 43 elided
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to