Re: Writing collection to file error
To get the results in a single file, you could do a repartition(1) and then save it. ratesAndPreds.repartition(1).saveAsTextFile("/path/CFOutput") Thanks Best Regards On Mon, Nov 24, 2014 at 8:32 PM, Saurabh Agrawal wrote: > > > Thanks for your help Akhil, however, this is creating an output folder and > storing the result sets in multiple files. Also the record count in the > result set seems to have multiplied!! Is there any other way to achieve > this? > > > > Thanks!! > > > > Regards, > > Saurabh Agrawal > > Vice President > > > > Markit > > > > Green Boulevard > > B-9A, Tower C > > 3rd Floor, Sector - 62, > > Noida 201301, India > > +91 120 611 8274 Office > > > > *From:* Akhil Das [mailto:ak...@sigmoidanalytics.com] > *Sent:* Monday, November 24, 2014 5:55 PM > *To:* Saurabh Agrawal > *Cc:* user@spark.apache.org > *Subject:* Re: Writing collection to file error > > > > Hi Saurabh, > > > > Here your ratesAndPreds is a RDD of type [((int, int), (Double, Double))] > not an Array. Now, if you want to save it on disk, then you can simply call > the saveAsTextFile and provide the location. > > > > So change your last line from this: > > > > ratesAndPreds.foreach(pw.println) > > > > to this > > > > ratesAndPreds.saveAsTextFile("/path/CFOutput") > > > > > > > > > Thanks > > Best Regards > > > > On Mon, Nov 24, 2014 at 5:05 PM, Saurabh Agrawal < > saurabh.agra...@markit.com> wrote: > > import org.apache.spark.mllib.recommendation.ALS > > import org.apache.spark.mllib.recommendation.Rating > > > > // Load and parse the data > > val data = sc.textFile("/path/CFReady.txt") > > val ratings = data.map(_.split('\t') match { case Array(user, item, rate) > => > > Rating(user.toInt, item.toInt, rate.toDouble) > > }) > > > > // Build the recommendation model using ALS > > val rank = 50 > > val numIterations = 100 > > val model = ALS.train(ratings, rank, numIterations, 0.10) > > > > // Evaluate the model on rating data > > val usersProducts = ratings.map { case Rating(user, product, rate) => > > (user, product) > > } > > val predictions = > > model.predict(usersProducts).map { case Rating(user, product, rate) => > > ((user, product), rate) > > } > > val ratesAndPreds = ratings.map { case Rating(user, product, rate) => > > ((user, product), rate) > > }.join(predictions) > > val MSE = ratesAndPreds.map { case ((user, product), (r1, r2)) => > > val err = (r1 - r2) > > err * err > > }.mean() > > println("Mean Squared Error = " + MSE) > > > > val pw = new PrintWriter(new File("/path/CFOutput.txt")) > > > > ratesAndPreds.foreach(pw.println) > > } > > > > > > Hi, > > > > Consider the highlighted code, I am trying to write the output of > ratesAndPreds array on to the disk. But I get error > > > > Task not serializable > > at > org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) > > at > org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) > > at > org.apache.spark.SparkContext.clean(SparkContext.scala:1242) > > at org.apache.spark.rdd.RDD.foreach(RDD.scala:758) > > at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:36) > > at $iwC$$iwC$$iwC$$iwC$$iwC.(:41) > > at $iwC$$iwC$$iwC$$iwC.(:43) > > at $iwC$$iwC$$iwC.(:45) > > at $iwC$$iwC.(:47) > > at $iwC.(:49) > > at (:51) > > at .(:55) > > at .() > > at .(:7) > > at .() > > at $print() > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > > at java.lang.reflect.Method.invoke(Method.java:606) > > at > org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:789) > > at > org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1062) > > at > org.apache.spark.repl.SparkIMain.loadAndRunReq$1(Spa
RE: Writing collection to file error
Thanks for your help Akhil, however, this is creating an output folder and storing the result sets in multiple files. Also the record count in the result set seems to have multiplied!! Is there any other way to achieve this? Thanks!! Regards, Saurabh Agrawal Vice President Markit Green Boulevard B-9A, Tower C 3rd Floor, Sector - 62, Noida 201301, India +91 120 611 8274 Office From: Akhil Das [mailto:ak...@sigmoidanalytics.com] Sent: Monday, November 24, 2014 5:55 PM To: Saurabh Agrawal Cc: user@spark.apache.org Subject: Re: Writing collection to file error Hi Saurabh, Here your ratesAndPreds is a RDD of type [((int, int), (Double, Double))] not an Array. Now, if you want to save it on disk, then you can simply call the saveAsTextFile and provide the location. So change your last line from this: ratesAndPreds.foreach(pw.println) to this ratesAndPreds.saveAsTextFile("/path/CFOutput") Thanks Best Regards On Mon, Nov 24, 2014 at 5:05 PM, Saurabh Agrawal mailto:saurabh.agra...@markit.com>> wrote: import org.apache.spark.mllib.recommendation.ALS import org.apache.spark.mllib.recommendation.Rating // Load and parse the data val data = sc.textFile("/path/CFReady.txt") val ratings = data.map(_.split('\t') match { case Array(user, item, rate) => Rating(user.toInt, item.toInt, rate.toDouble) }) // Build the recommendation model using ALS val rank = 50 val numIterations = 100 val model = ALS.train(ratings, rank, numIterations, 0.10) // Evaluate the model on rating data val usersProducts = ratings.map { case Rating(user, product, rate) => (user, product) } val predictions = model.predict(usersProducts).map { case Rating(user, product, rate) => ((user, product), rate) } val ratesAndPreds = ratings.map { case Rating(user, product, rate) => ((user, product), rate) }.join(predictions) val MSE = ratesAndPreds.map { case ((user, product), (r1, r2)) => val err = (r1 - r2) err * err }.mean() println("Mean Squared Error = " + MSE) val pw = new PrintWriter(new File("/path/CFOutput.txt")) ratesAndPreds.foreach(pw.println) } Hi, Consider the highlighted code, I am trying to write the output of ratesAndPreds array on to the disk. But I get error Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1242) at org.apache.spark.rdd.RDD.foreach(RDD.scala:758) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:36) at $iwC$$iwC$$iwC$$iwC$$iwC.(:41) at $iwC$$iwC$$iwC$$iwC.(:43) at $iwC$$iwC$$iwC.(:45) at $iwC$$iwC.(:47) at $iwC.(:49) at (:51) at .(:55) at .() at .(:7) at .() at $print() at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:789) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1062) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:615) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:646) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:610) at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:814) at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:859) at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:771) at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:616) at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:624) at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:629) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:954) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902) at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:902) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:997)
Re: Writing collection to file error
Hi Saurabh, Here your ratesAndPreds is a RDD of type [((int, int), (Double, Double))] not an Array. Now, if you want to save it on disk, then you can simply call the saveAsTextFile and provide the location. So change your last line from this: ratesAndPreds.foreach(pw.println) to this ratesAndPreds.saveAsTextFile("/path/CFOutput") Thanks Best Regards On Mon, Nov 24, 2014 at 5:05 PM, Saurabh Agrawal wrote: >import org.apache.spark.mllib.recommendation.ALS > > import org.apache.spark.mllib.recommendation.Rating > > > > // Load and parse the data > > val data = sc.textFile("/path/CFReady.txt") > > val ratings = data.map(_.split('\t') match { case Array(user, item, rate) > => > > Rating(user.toInt, item.toInt, rate.toDouble) > > }) > > > > // Build the recommendation model using ALS > > val rank = 50 > > val numIterations = 100 > > val model = ALS.train(ratings, rank, numIterations, 0.10) > > > > // Evaluate the model on rating data > > val usersProducts = ratings.map { case Rating(user, product, rate) => > > (user, product) > > } > > val predictions = > > model.predict(usersProducts).map { case Rating(user, product, rate) => > > ((user, product), rate) > > } > > val ratesAndPreds = ratings.map { case Rating(user, product, rate) => > > ((user, product), rate) > > }.join(predictions) > > val MSE = ratesAndPreds.map { case ((user, product), (r1, r2)) => > > val err = (r1 - r2) > > err * err > > }.mean() > > println("Mean Squared Error = " + MSE) > > > > val pw = new PrintWriter(new File("/path/CFOutput.txt")) > > > > ratesAndPreds.foreach(pw.println) > > } > > > > > > Hi, > > > > Consider the highlighted code, I am trying to write the output of > ratesAndPreds array on to the disk. But I get error > > > > Task not serializable > > at > org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) > > at > org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) > > at > org.apache.spark.SparkContext.clean(SparkContext.scala:1242) > > at org.apache.spark.rdd.RDD.foreach(RDD.scala:758) > > at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:36) > > at $iwC$$iwC$$iwC$$iwC$$iwC.(:41) > > at $iwC$$iwC$$iwC$$iwC.(:43) > > at $iwC$$iwC$$iwC.(:45) > > at $iwC$$iwC.(:47) > > at $iwC.(:49) > > at (:51) > > at .(:55) > > at .() > > at .(:7) > > at .() > > at $print() > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > > at java.lang.reflect.Method.invoke(Method.java:606) > > at > org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:789) > > at > org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1062) > > at > org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:615) > > at > org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:646) > > at > org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:610) > > at > org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:814) > > at > org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:859) > > at > org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:771) > > at > org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:616) > > at > org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:624) > > at > org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:629) > > at > org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:954) > > at > org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902) > > at > org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902) > > at > scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) > > at > org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:902) > > at > org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:997) > > at org.apache.spark.repl.Main$.main(Main.scala:31) > > at org.apache.spark.repl.Main.main(Main.scala) > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > > at > sun.reflect.DelegatingMethodAc