You can repartition to 1 partition to generate 1 output file, but, that has other potentially bad implications for your processing. You would be using 1 partition and 1 worker only for the final stages of processing.
On Thu, Nov 20, 2014 at 11:32 AM, <[email protected]> wrote: > Hi > > My question is generic: > > § Is it possible to save the streams to one single file ? if yes can you > give me a link or code sample? > > § I tried using .saveastextfile but its creating different file for each > stream. I need to update the same file instead of creating different file > for each stream. > > My Use Case: > > § Retrieve twitter streams , then extract each tweets and perform sentiment > analysis on them. Count the number of +ve and –ve sentiments. > > § Save the count in a file .file should get updated with each stream. > > > > object sparkAnalytics { > > def main(args: Array[String]) { > > > org.apache.log4j.LogManager.getRootLogger().setLevel(org.apache.log4j.Level.ERROR); > > val sentimentAnalyzer = new SentimentAnalyzer(); > > System.setProperty("twitter4j.oauth.consumerKey", "***") > > System.setProperty("twitter4j.oauth.consumerSecret", "**********") > > System.setProperty("twitter4j.oauth.accessToken", "******** ") > > System.setProperty("twitter4j.oauth.accessTokenSecret", > "**************") > > val sparkConf = new > SparkConf().setAppName("TwitterSentimentalAnalysis").setMaster("local[4]").set("spark.eventLog.enabled", > "true") > > val ssc = new StreamingContext(sparkConf, Seconds(2)) > > val stream = TwitterUtils.createStream(ssc, None) //, filters) > > val statuses = stream.map( > > status => sentimentAnalyzer.findSentiment( > > status.getText().replaceAll("[^A-Za-z0-9 \\#]", ""))) > > val line = statuses.map( tweetWithSentiment => > tweetWithSentiment.getCssClass()) > > val pos = line.filter(s => s.contains("sentiment-positive")) > > val k = pos.count > > k.print //Instead of Printing it in the console i have to update a file > > val neg = line.filter(s => s.contains("sentiment-negative")) > > val n = neg.count > > n.print //Instead of Printing it in the console i have to update a file > > ssc.start() > > ssc.awaitTermination() > > > > } > > } > > > > Thanks & Regards > > Jishnu Menath Prathap > > > > The information contained in this electronic message and any attachments to > this message are intended for the exclusive use of the addressee(s) and may > contain proprietary, confidential or privileged information. If you are not > the intended recipient, you should not disseminate, distribute or copy this > e-mail. Please notify the sender immediately and destroy all copies of this > message and any attachments. > > WARNING: Computer viruses can be transmitted via email. The recipient should > check this email and any attachments for the presence of viruses. The > company accepts no liability for any damage caused by any virus transmitted > by this email. > > www.wipro.com --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
