Sean Dstream.saveAsTextFiles internally calls foreachRDD and saveAsTextFile for each interval
def saveAsTextFiles(prefix: String, suffix: String = "") { val saveFunc = (rdd: RDD[T], time: Time) => { val file = rddToFileName(prefix, suffix, time) rdd.saveAsTextFile(file) } this.foreachRDD(saveFunc) } val sparkConf = new SparkConf().setAppName("TwitterRawJSON") val ssc = new StreamingContext(sparkConf, Seconds(30)) stream.saveAsTextFiles("hdfs://localhost:9000/twitterRawJSON") 1) if there are no sliding window calls in this streaming context, will there just one file written per interval? 2) if there is a sliding window call in the same context, such as val hashTags = stream.flatMap(json => DataObjectFactory.createStatus(json).getText.split(" ").filter(_.startsWith("#"))) val topCounts60 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(600)) .map{case (topic, count) => (count, topic)} .transform(_.sortByKey(false)) will the some files get written multiples time (as long as the interval is in the batch) Deenar >>DStream.foreachRDD gives you an RDD[String] for each interval of course. I don't think it makes sense to say a DStream can be converted into one RDD since it is a stream. The past elements are inherently not supposed to stick around for a long time, and future elements aren't known. You may consider saving each RDD[String] to HDFS, and then simply loading it from HDFS as an RDD[String]. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/converting-DStream-String-into-RDD-String-in-spark-streaming-tp20253p22175.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org