You're just describing the operation of Spark Streaming at its simplest, without windowing. You get non-overlapping RDDs of the most recent data each time.
On Sun, Mar 29, 2015 at 8:44 AM, Deenar Toraskar <[email protected]> wrote: > Sean > > Thank you very much for your response. I have a requirement run a function > only over the new inputs in a Spark Streaming sliding window, i.e. the > latest batch of events only, do I just get a new Dstream using the slide > duration equal to the window duration ? such as > > > val sparkConf = new SparkConf().setAppName("TwitterRawJSON") > val ssc = new StreamingContext(sparkConf, Seconds(30)) > // write all new tweets in the last 10mins > stream.window(Seconds(600), Seconds(600), > saveAsTextFiles("hdfs://localhost:9000/twitterRawJSON") > > > Alternatively I could find the time of the new batch, i could do something > like this > > def saveAsTextFiles(prefix: String, suffix: String = "") { > val saveFunc = (rdd: RDD[T], time: Time) => { > if (time == currentBatchTime) { > val file = rddToFileName(prefix, suffix, time) > rdd.saveAsTextFile(file) > } > } > this.foreachRDD(saveFunc) > } > > Regards > Deenar > > P.S. The mail archive on nabble does not seem to show all responses. > -----Original Mes > > P.sage----- > From: Sean Owen [mailto:[email protected]] > Sent: 22 March 2015 11:49 > To: Deenar Toraskar > Cc: [email protected] > Subject: Re: converting DStream[String] into RDD[String] in spark streaming > > On Sun, Mar 22, 2015 at 8:43 AM, deenar.toraskar <[email protected]> > wrote: >> 1) if there are no sliding window calls in this streaming context, >> will there just one file written per interval? > > As many files as there are partitions will be written in each interval. > >> 2) if there is a sliding window call in the same context, such as >> >> val hashTags = stream.flatMap(json => >> DataObjectFactory.createStatus(json).getText.split(" >> ").filter(_.startsWith("#"))) >> >> val topCounts60 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, >> Seconds(600)) >> .map{case (topic, count) => (count, topic)} >> .transform(_.sortByKey(false)) >> >> will the some files get written multiples time (as long as the >> interval is in the batch) > > I don't think it's right to say files will be written many times, but yes it > is my understanding that data will be written many times since a datum lies > in many windows. > > > --- > This e-mail may contain confidential and/or privileged information. If you > are not the intended recipient (or have received this e-mail in error) > please notify the sender immediately and delete this e-mail. Any > unauthorized copying, disclosure or distribution of the material in this > e-mail is strictly forbidden. > > Please refer to http://www.db.com/en/content/eu_disclosures.htm for > additional EU corporate and regulatory disclosures and to > http://www.db.com/unitedkingdom/content/privacy.htm for information about > privacy. > --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
