sivasai2006 commented on issue #11753: [SPARK-13316][Streaming]add check to avoid registering new DStream when recovering from CP URL: https://github.com/apache/spark/pull/11753#issuecomment-499868644 I faced the same issue in PySpark. ``` from pyspark.sql import SparkSession from pyspark.streaming import StreamingContext checkpointDir = "/hadoop/checkpoint/" def createStreamingContext(): spark = SparkSession \ .builder \ .appName("CheckpointStreaming") \ .config("spark.debug.maxToStringFields", 100) \ .getOrCreate() sc = spark.sparkContext ssc = StreamingContext(sc, 30) ssc.checkpoint(checkpointDir) return ssc def process(time, rdd): #print("========= %s =========" % str(time)) try: # Get the singleton instance of SparkSession spark = getSparkSessionInstance(rdd.context.getConf()) # Convert RDD[String] to RDD[Row] to DataFrame rowRdd = rdd.map(lambda w: Row(word=w)) wordsDataFrame = spark.createDataFrame(rowRdd) # Creates a temporary view using the DataFrame wordsDataFrame.createOrReplaceTempView("words") # Do word count on table using SQL and print it wordCountsDataFrame = spark.sql("select word as total from words") wordCountsDataFrame.show() wordCountsDataFrame.saveAsTextFiles("/hadoop/sample/output/") except: pass if __name__ == "__main__": #ssc = createStreamingContext(checkpointDir) ssc = StreamingContext.getOrCreate(checkpointDir, createStreamingContext) json_rdd = ssc.textFileStream("/hadoop/sample/data/") #json_rdd.cache() #json_rdd.checkpoint(30) json_rdd.pprint() json_rdd.foreachRDD(process) #json_rdd.saveAsTextFiles("/hadoop/sample/output/") ssc.start() ssc.awaitTermination() ``` I got the below error, ``` 19/06/07 08:29:46 ERROR streaming.StreamingContext: Error starting the context, marking it as stopped org.apache.spark.SparkException: org.apache.spark.streaming.dstream.MappedDStream@29d2c46b has not been initialized at org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:313) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334) at scala.Option.orElse(Option.scala:289) at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331) at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48) at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:122) at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:121) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104) at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:121) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:234) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:229) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:229) at org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:98) at org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:103) at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply$mcV$sp(StreamingContext.scala:583) at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:578) at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:578) at ... run in separate thread using org.apache.spark.util.ThreadUtils ... () at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:578) at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:572) at org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:556) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:280) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:214) at java.lang.Thread.run(Thread.java:748) ```
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
