Re: Persist streams to text files
Hi , You can use FileUtil.copemerge API and specify the path to the folder where saveAsTextFile is save the part text file. Suppose your directory is /a/b/c/ use FileUtil.copeMerge(FileSystem of source, a/b/c, FileSystem of destination, Path to the merged file say (a/b/c.txt), true(to delete the original dir,null)) Thanks. On Fri, Nov 21, 2014 at 11:31 AM, Jishnu Prathap [via Apache Spark User List] ml-node+s1001560n19449...@n3.nabble.com wrote: Hi I am also having similar problem.. any fix suggested.. *Originally Posted by GaganBM* Hi, I am trying to persist the DStreams to text files. When I use the inbuilt API 'saveAsTextFiles' as : stream.saveAsTextFiles(resultDirectory) this creates a number of subdirectories, for each batch, and within each sub directory, it creates bunch of text files for each RDD (I assume). I am wondering if I can have single text files for each batch. Is there any API for that ? Or else, a single output file for the entire stream ? I tried to manually write from each RDD stream to a text file as : stream.foreachRDD(rdd ={ rdd.foreach(element = { fileWriter.write(element) }) }) where 'fileWriter' simply makes use of a Java BufferedWriter to write strings to a file. However, this fails with exception : DStreamCheckpointData.writeObject used java.io.BufferedWriter java.io.NotSerializableException: java.io.BufferedWriter at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) . Any help on how to proceed with this ? The information contained in this electronic message and any attachments to this message are intended for the exclusive use of the addressee(s) and may contain proprietary, confidential or privileged information. If you are not the intended recipient, you should not disseminate, distribute or copy this e-mail. Please notify the sender immediately and destroy all copies of this message and any attachments. WARNING: Computer viruses can be transmitted via email. The recipient should check this email and any attachments for the presence of viruses. The company accepts no liability for any damage caused by any virus transmitted by this email. www.wipro.com -- If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/Re-Persist-streams-to-text-files-tp19449.html To start a new topic under Apache Spark User List, email ml-node+s1001560n1...@n3.nabble.com To unsubscribe from Apache Spark User List, click here http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=1code=cHJhbm5veUBzaWdtb2lkYW5hbHl0aWNzLmNvbXwxfC0xNTI2NTg4NjQ2 . NAML http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Re-Persist-streams-to-text-files-tp19449p19457.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
RE: Persist streams to text files
Hi Thank you ☺Akhil it worked like charm….. I used the file writer outside rdd.foreach that might be the reason for nonserialisable exception…. Thanks Regards Jishnu Menath Prathap From: Akhil Das [mailto:ak...@sigmoidanalytics.com] Sent: Friday, November 21, 2014 1:15 PM To: Jishnu Menath Prathap (WT01 - BAS) Cc: u...@spark.incubator.apache.org Subject: Re: Persist streams to text files Here's a quick version to store (append) in your local machine val tweets = TwitterUtils.createStream(ssc, None) val hashTags = tweets.flatMap(status = status.getText.split( ).filter(_.startsWith(#))) hashTags.foreachRDD(rdds = { rdds.foreach(rdd = { val fw = new FileWriter(/home/akhld/tags.txt, true) println(HashTag = + rdd) fw.write(rdd + \n) fw.close() }) }) Thanks Best Regards On Fri, Nov 21, 2014 at 12:12 PM, jishnu.prat...@wipro.commailto:jishnu.prat...@wipro.com wrote: Hi Akhil Thanks for reply But it creates different directories ..I tried using filewriter but it shows non serializable error.. val stream = TwitterUtils.createStream(ssc, None) //, filters) val statuses = stream.map( status = sentimentAnalyzer.findSentiment({ status.getText().replaceAll([^A-Za-z0-9 \\#], ) }) ) val line = statuses.foreachRDD( rdd = { rdd.foreach( tweetWithSentiment = { if(!tweetWithSentiment.getLine().isEmpty()) println(tweetWithSentiment.getCssClass() + for line := + tweetWithSentiment.getLine())//Now I print in console but I need to update it to a file in local machine }) }) Thanks Regards Jishnu Menath Prathap From: Akhil Das [mailto:ak...@sigmoidanalytics.commailto:ak...@sigmoidanalytics.com] Sent: Friday, November 21, 2014 11:48 AM To: Jishnu Menath Prathap (WT01 - BAS) Cc: u...@spark.incubator.apache.orgmailto:u...@spark.incubator.apache.org Subject: Re: Persist streams to text files To have a single text file output for each batch you can repartition it to 1 and then call the saveAsTextFiles stream.repartition(1).saveAsTextFiles(location) On 21 Nov 2014 11:28, jishnu.prat...@wipro.commailto:jishnu.prat...@wipro.com wrote: Hi I am also having similar problem.. any fix suggested.. Originally Posted by GaganBM Hi, I am trying to persist the DStreams to text files. When I use the inbuilt API 'saveAsTextFiles' as : stream.saveAsTextFiles(resultDirectory) this creates a number of subdirectories, for each batch, and within each sub directory, it creates bunch of text files for each RDD (I assume). I am wondering if I can have single text files for each batch. Is there any API for that ? Or else, a single output file for the entire stream ? I tried to manually write from each RDD stream to a text file as : stream.foreachRDD(rdd ={ rdd.foreach(element = { fileWriter.write(element) }) }) where 'fileWriter' simply makes use of a Java BufferedWriter to write strings to a file. However, this fails with exception : DStreamCheckpointData.writeObject used java.io.BufferedWriter java.io.NotSerializableException: java.io.BufferedWriter at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) . Any help on how to proceed with this ? The information contained in this electronic message and any attachments to this message are intended for the exclusive use of the addressee(s) and may contain proprietary, confidential or privileged information. If you are not the intended recipient, you should not disseminate, distribute or copy this e-mail. Please notify the sender immediately and destroy all copies of this message and any attachments. WARNING: Computer viruses can be transmitted via email. The recipient should check this email and any attachments for the presence of viruses. The company accepts no liability for any damage caused by any virus transmitted by this email. www.wipro.comhttp://www.wipro.com The information contained in this electronic message and any attachments to this message are intended for the exclusive use of the addressee(s) and may contain proprietary, confidential or privileged information. If you are not the intended recipient, you should not disseminate, distribute or copy this e-mail. Please notify the sender immediately and destroy all copies of this message and any attachments. WARNING: Computer viruses can be transmitted via email. The recipient should check this email and any attachments for the presence of viruses. The company accepts no liability for any damage caused by any virus transmitted by this email. www.wipro.comhttp://www.wipro.com The information contained in this electronic message and any attachments to this message are intended for the exclusive use of the addressee(s) and may contain proprietary, confidential or privileged information
Re: Persist streams to text files
To have a single text file output for each batch you can repartition it to 1 and then call the saveAsTextFiles stream.repartition(1).saveAsTextFiles(location) On 21 Nov 2014 11:28, jishnu.prat...@wipro.com wrote: Hi I am also having similar problem.. any fix suggested.. *Originally Posted by GaganBM* Hi, I am trying to persist the DStreams to text files. When I use the inbuilt API 'saveAsTextFiles' as : stream.saveAsTextFiles(resultDirectory) this creates a number of subdirectories, for each batch, and within each sub directory, it creates bunch of text files for each RDD (I assume). I am wondering if I can have single text files for each batch. Is there any API for that ? Or else, a single output file for the entire stream ? I tried to manually write from each RDD stream to a text file as : stream.foreachRDD(rdd ={ rdd.foreach(element = { fileWriter.write(element) }) }) where 'fileWriter' simply makes use of a Java BufferedWriter to write strings to a file. However, this fails with exception : DStreamCheckpointData.writeObject used java.io.BufferedWriter java.io.NotSerializableException: java.io.BufferedWriter at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) . Any help on how to proceed with this ? The information contained in this electronic message and any attachments to this message are intended for the exclusive use of the addressee(s) and may contain proprietary, confidential or privileged information. If you are not the intended recipient, you should not disseminate, distribute or copy this e-mail. Please notify the sender immediately and destroy all copies of this message and any attachments. WARNING: Computer viruses can be transmitted via email. The recipient should check this email and any attachments for the presence of viruses. The company accepts no liability for any damage caused by any virus transmitted by this email. www.wipro.com
RE: Persist streams to text files
Hi Akhil Thanks for reply But it creates different directories ..I tried using filewriter but it shows non serializable error.. val stream = TwitterUtils.createStream(ssc, None) //, filters) val statuses = stream.map( status = sentimentAnalyzer.findSentiment({ status.getText().replaceAll([^A-Za-z0-9 \\#], ) }) ) val line = statuses.foreachRDD( rdd = { rdd.foreach( tweetWithSentiment = { if(!tweetWithSentiment.getLine().isEmpty()) println(tweetWithSentiment.getCssClass() + for line := + tweetWithSentiment.getLine())//Now I print in console but I need to update it to a file in local machine }) }) Thanks Regards Jishnu Menath Prathap From: Akhil Das [mailto:ak...@sigmoidanalytics.com] Sent: Friday, November 21, 2014 11:48 AM To: Jishnu Menath Prathap (WT01 - BAS) Cc: u...@spark.incubator.apache.org Subject: Re: Persist streams to text files To have a single text file output for each batch you can repartition it to 1 and then call the saveAsTextFiles stream.repartition(1).saveAsTextFiles(location) On 21 Nov 2014 11:28, jishnu.prat...@wipro.commailto:jishnu.prat...@wipro.com wrote: Hi I am also having similar problem.. any fix suggested.. Originally Posted by GaganBM Hi, I am trying to persist the DStreams to text files. When I use the inbuilt API 'saveAsTextFiles' as : stream.saveAsTextFiles(resultDirectory) this creates a number of subdirectories, for each batch, and within each sub directory, it creates bunch of text files for each RDD (I assume). I am wondering if I can have single text files for each batch. Is there any API for that ? Or else, a single output file for the entire stream ? I tried to manually write from each RDD stream to a text file as : stream.foreachRDD(rdd ={ rdd.foreach(element = { fileWriter.write(element) }) }) where 'fileWriter' simply makes use of a Java BufferedWriter to write strings to a file. However, this fails with exception : DStreamCheckpointData.writeObject used java.io.BufferedWriter java.io.NotSerializableException: java.io.BufferedWriter at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) . Any help on how to proceed with this ? The information contained in this electronic message and any attachments to this message are intended for the exclusive use of the addressee(s) and may contain proprietary, confidential or privileged information. If you are not the intended recipient, you should not disseminate, distribute or copy this e-mail. Please notify the sender immediately and destroy all copies of this message and any attachments. WARNING: Computer viruses can be transmitted via email. The recipient should check this email and any attachments for the presence of viruses. The company accepts no liability for any damage caused by any virus transmitted by this email. www.wipro.comhttp://www.wipro.com The information contained in this electronic message and any attachments to this message are intended for the exclusive use of the addressee(s) and may contain proprietary, confidential or privileged information. If you are not the intended recipient, you should not disseminate, distribute or copy this e-mail. Please notify the sender immediately and destroy all copies of this message and any attachments. WARNING: Computer viruses can be transmitted via email. The recipient should check this email and any attachments for the presence of viruses. The company accepts no liability for any damage caused by any virus transmitted by this email. www.wipro.com
Re: Persist streams to text files
Here's a quick version to store (append) in your local machine val tweets = TwitterUtils.createStream(ssc, None) val hashTags = tweets.flatMap(status = status.getText.split( ).filter(_.startsWith(#))) hashTags.foreachRDD(rdds = { rdds.foreach(rdd = { *val fw = new FileWriter(/home/akhld/tags.txt, true)* *println(HashTag = + rdd)* *fw.write(rdd + \n)* *fw.close()* }) }) Thanks Best Regards On Fri, Nov 21, 2014 at 12:12 PM, jishnu.prat...@wipro.com wrote: Hi Akhil Thanks for reply But it creates different directories ..I tried using filewriter but it shows non serializable error.. *val* stream = TwitterUtils.createStream(ssc, None) //, filters) *val* statuses = stream.map( status = sentimentAnalyzer.findSentiment({ status.getText().replaceAll([^A-Za-z0-9 \\#], ) }) ) *val* line = statuses.foreachRDD( rdd = { rdd.foreach( tweetWithSentiment = { *if*(!tweetWithSentiment.getLine().isEmpty()) println(tweetWithSentiment.getCssClass() + for line := + tweetWithSentiment.getLine())//*Now I print in console but I need to update it to a file in local machine* }) }) Thanks Regards Jishnu Menath Prathap *From:* Akhil Das [mailto:ak...@sigmoidanalytics.com] *Sent:* Friday, November 21, 2014 11:48 AM *To:* Jishnu Menath Prathap (WT01 - BAS) *Cc:* u...@spark.incubator.apache.org *Subject:* Re: Persist streams to text files To have a single text file output for each batch you can repartition it to 1 and then call the saveAsTextFiles stream.repartition(1).saveAsTextFiles(location) On 21 Nov 2014 11:28, jishnu.prat...@wipro.com wrote: Hi I am also having similar problem.. any fix suggested.. *Originally Posted by GaganBM* Hi, I am trying to persist the DStreams to text files. When I use the inbuilt API 'saveAsTextFiles' as : stream.saveAsTextFiles(resultDirectory) this creates a number of subdirectories, for each batch, and within each sub directory, it creates bunch of text files for each RDD (I assume). I am wondering if I can have single text files for each batch. Is there any API for that ? Or else, a single output file for the entire stream ? I tried to manually write from each RDD stream to a text file as : stream.foreachRDD(rdd ={ rdd.foreach(element = { fileWriter.write(element) }) }) where 'fileWriter' simply makes use of a Java BufferedWriter to write strings to a file. However, this fails with exception : DStreamCheckpointData.writeObject used java.io.BufferedWriter java.io.NotSerializableException: java.io.BufferedWriter at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) . Any help on how to proceed with this ? The information contained in this electronic message and any attachments to this message are intended for the exclusive use of the addressee(s) and may contain proprietary, confidential or privileged information. If you are not the intended recipient, you should not disseminate, distribute or copy this e-mail. Please notify the sender immediately and destroy all copies of this message and any attachments. WARNING: Computer viruses can be transmitted via email. The recipient should check this email and any attachments for the presence of viruses. The company accepts no liability for any damage caused by any virus transmitted by this email. www.wipro.com The information contained in this electronic message and any attachments to this message are intended for the exclusive use of the addressee(s) and may contain proprietary, confidential or privileged information. If you are not the intended recipient, you should not disseminate, distribute or copy this e-mail. Please notify the sender immediately and destroy all copies of this message and any attachments. WARNING: Computer viruses can be transmitted via email. The recipient should check this email and any attachments for the presence of viruses. The company accepts no liability for any damage caused by any virus transmitted by this email. www.wipro.com