sivasai2006 commented on issue #11753: [SPARK-13316][Streaming]add check to 
avoid registering new DStream when recovering from CP
URL: https://github.com/apache/spark/pull/11753#issuecomment-499868644
 
 
   I faced the same issue in PySpark. 
   
   
   ```
   from pyspark.sql import SparkSession
   from pyspark.streaming import StreamingContext
   
   checkpointDir = "/hadoop/checkpoint/"
   
   def createStreamingContext():
       spark = SparkSession \
           .builder \
           .appName("CheckpointStreaming") \
           .config("spark.debug.maxToStringFields", 100) \
           .getOrCreate()
   
       sc = spark.sparkContext
       ssc = StreamingContext(sc, 30)
       ssc.checkpoint(checkpointDir)
   
       return ssc
   
   
   def process(time, rdd):
       #print("========= %s =========" % str(time))
       try:
           # Get the singleton instance of SparkSession
           spark = getSparkSessionInstance(rdd.context.getConf())
   
           # Convert RDD[String] to RDD[Row] to DataFrame
           rowRdd = rdd.map(lambda w: Row(word=w))
           wordsDataFrame = spark.createDataFrame(rowRdd)
   
           # Creates a temporary view using the DataFrame
           wordsDataFrame.createOrReplaceTempView("words")
   
           # Do word count on table using SQL and print it
           wordCountsDataFrame = spark.sql("select word as total from words")
           wordCountsDataFrame.show()
   
           wordCountsDataFrame.saveAsTextFiles("/hadoop/sample/output/")
   
       except:
           pass
   
   if __name__ == "__main__":
       
       #ssc = createStreamingContext(checkpointDir)
       ssc = StreamingContext.getOrCreate(checkpointDir, createStreamingContext)
   
       json_rdd = ssc.textFileStream("/hadoop/sample/data/")
   
       #json_rdd.cache()
       #json_rdd.checkpoint(30)
   
       json_rdd.pprint()
   
       json_rdd.foreachRDD(process)     
   
       #json_rdd.saveAsTextFiles("/hadoop/sample/output/")
   
       ssc.start()
       ssc.awaitTermination() 
   ```
   
   I got the below error,
   
   ```
   19/06/07 08:29:46 ERROR streaming.StreamingContext: Error starting the 
context, marking it as stopped
   org.apache.spark.SparkException: 
org.apache.spark.streaming.dstream.MappedDStream@29d2c46b has not been 
initialized
           at 
org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:313)
           at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
           at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
           at scala.Option.orElse(Option.scala:289)
           at 
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
           at 
org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
           at 
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:122)
           at 
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:121)
           at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
           at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
           at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
           at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
           at 
scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
           at 
scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
           at 
org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:121)
           at 
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:234)
           at 
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:229)
           at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
           at 
scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
           at 
org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:229)
           at 
org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:98)
           at 
org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:103)
           at 
org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply$mcV$sp(StreamingContext.scala:583)
           at 
org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:578)
           at 
org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:578)
           at ... run in separate thread using 
org.apache.spark.util.ThreadUtils ... ()
           at 
org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:578)
           at 
org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:572)
           at 
org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:556)
           at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
           at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
           at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
           at java.lang.reflect.Method.invoke(Method.java:498)
           at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
           at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
           at py4j.Gateway.invoke(Gateway.java:280)
           at 
py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
           at py4j.commands.CallCommand.execute(CallCommand.java:79)
           at py4j.GatewayConnection.run(GatewayConnection.java:214)
           at java.lang.Thread.run(Thread.java:748)
   
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to