Solved! The problem has nothing to do about class and object refactory. But in the process of this refactory I made a change that is similar of your code.
Before this refactory, I processed the DStream inside the function that I sent to StreamingContext.getOrCreate. After, I started processing the DStream using the returned from StreamingContext.getOrCreate returned. So you should call *fetchTweets *inside *managingContext*. That worked for me. Tiago Tiago Albineli Motta Desenvolvedor de Software - Globo.com ICQ: 32107100 http://programandosemcafeina.blogspot.com On Thu, Oct 22, 2015 at 11:22 AM, Tiago Albineli Motta <[email protected]> wrote: > Can't say what is happening, and I have a similar problem here. > > While for you the source is: > > org.apache.spark.streaming.dstream.WindowedDStream@532d0784 has not been > initialized > > > For me is: > > org.apache.spark.SparkException: > org.apache.spark.streaming.dstream.MapPartitionedDStream@7a2d07cc has not > been initialized > > > Here, the problem started after I change my main class to use another class > to execute the stream. > > > Before: > > > object TopStream { > > //everything here > > } > > > After > > > object TopStream { > > // call new TopStream.process( ... ) > > } > > > class TopStream extends Serializable { > > } > > > > > > Tiago Albineli Motta > Desenvolvedor de Software - Globo.com > ICQ: 32107100 > http://programandosemcafeina.blogspot.com > > On Wed, Jul 29, 2015 at 12:59 PM, Sadaf <[email protected]> wrote: > >> Hi >> >> I am new to Spark Streaming and writing a code for twitter connector. >> when i >> run this code more than one time, it gives the following exception. I have >> to create a new hdfs directory for checkpointing each time to make it run >> successfully and moreover it doesn't get stopped. >> >> ERROR StreamingContext: Error starting the context, marking it as stopped >> org.apache.spark.SparkException: >> org.apache.spark.streaming.dstream.WindowedDStream@532d0784 has not been >> initialized >> at >> org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:321) >> at >> >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342) >> at >> >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342) >> at scala.Option.orElse(Option.scala:257) >> at >> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339) >> at >> >> org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38) >> at >> >> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120) >> at >> >> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120) >> at >> >> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) >> at >> >> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) >> at >> >> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >> at >> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) >> at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) >> at >> >> org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:120) >> at >> >> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:227) >> at >> >> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:222) >> at >> >> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) >> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) >> at >> >> org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:222) >> at >> >> org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:92) >> at >> >> org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:73) >> at >> >> org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:588) >> at >> >> org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:586) >> at twitter.streamingSpark$.twitterConnector(App.scala:38) >> at twitter.streamingSpark$.main(App.scala:26) >> at twitter.streamingSpark.main(App.scala) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) >> at >> >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:606) >> at >> >> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664) >> at >> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169) >> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192) >> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111) >> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) >> >> The relavent code is >> >> def twitterConnector() :Unit = >> { >> val atwitter=managingCredentials() >> >> val ssc=StreamingContext.getOrCreate("hdfsDirectory",()=> { >> managingContext() }) >> fetchTweets(ssc, atwitter ) >> >> ssc.start() // Start the computation >> ssc.awaitTermination() >> >> } >> >> def managingContext():StreamingContext = >> { >> //making spark context >> val conf = new >> SparkConf().setMaster("local[*]").setAppName("twitterConnector") >> val ssc = new StreamingContext(conf, Seconds(1)) >> val sqlContext = new org.apache.spark.sql.SQLContext(ssc.sparkContext) >> import sqlContext.implicits._ >> >> //checkpointing >> ssc.checkpoint("hdfsDirectory") >> ssc >> } >> def fetchTweets (ssc : StreamingContext , atwitter : >> Option[twitter4j.auth.Authorization]) : Unit = { >> >> >> val tweets >> >> =TwitterUtils.createStream(ssc,atwitter,Nil,StorageLevel.MEMORY_AND_DISK_2) >> val twt = tweets.window(Seconds(10),Seconds(10)) >> //checkpoint duration >> /twt.checkpoint(new Duration(1000)) >> >> //processing >> case class Tweet(createdAt:Long, text:String) >> twt.map(status=> >> Tweet(status.getCreatedAt().getTime()/1000, status.getText()) >> ) >> twt.print() >> } >> >> >> >> -- >> View this message in context: >> http://apache-spark-user-list.1001560.n3.nabble.com/Error-in-starting-Spark-Streaming-Context-tp24063.html >> Sent from the Apache Spark User List mailing list archive at Nabble.com. >> >> --------------------------------------------------------------------- >> To unsubscribe, e-mail: [email protected] >> For additional commands, e-mail: [email protected] >> >> >
