Re: Writing collection to file error

2014-11-24 Thread Akhil Das
Hi Saurabh,

Here your ratesAndPreds is a RDD of type [((int, int), (Double, Double))]
not an Array. Now, if you want to save it on disk, then you can simply call
the saveAsTextFile and provide the location.

So change your last line from this:

ratesAndPreds.foreach(pw.println)


to this

ratesAndPreds.saveAsTextFile(/path/CFOutput)





Thanks
Best Regards

On Mon, Nov 24, 2014 at 5:05 PM, Saurabh Agrawal saurabh.agra...@markit.com
 wrote:

import org.apache.spark.mllib.recommendation.ALS

 import org.apache.spark.mllib.recommendation.Rating



 // Load and parse the data

 val data = sc.textFile(/path/CFReady.txt)

 val ratings = data.map(_.split('\t') match { case Array(user, item, rate)
 =

 Rating(user.toInt, item.toInt, rate.toDouble)

   })



 // Build the recommendation model using ALS

 val rank = 50

 val numIterations = 100

 val model = ALS.train(ratings, rank, numIterations, 0.10)



 // Evaluate the model on rating data

 val usersProducts = ratings.map { case Rating(user, product, rate) =

   (user, product)

 }

 val predictions =

   model.predict(usersProducts).map { case Rating(user, product, rate) =

 ((user, product), rate)

   }

 val ratesAndPreds = ratings.map { case Rating(user, product, rate) =

   ((user, product), rate)

 }.join(predictions)

 val MSE = ratesAndPreds.map { case ((user, product), (r1, r2)) =

   val err = (r1 - r2)

   err * err

 }.mean()

 println(Mean Squared Error =  + MSE)



 val pw = new PrintWriter(new File(/path/CFOutput.txt))



 ratesAndPreds.foreach(pw.println)

 }





 Hi,



 Consider the highlighted code, I am trying to write the output of
 ratesAndPreds array on to the disk. But I get error



 Task not serializable

 at
 org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)

 at
 org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)

 at
 org.apache.spark.SparkContext.clean(SparkContext.scala:1242)

 at org.apache.spark.rdd.RDD.foreach(RDD.scala:758)

 at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:36)

 at $iwC$$iwC$$iwC$$iwC$$iwC.init(console:41)

 at $iwC$$iwC$$iwC$$iwC.init(console:43)

 at $iwC$$iwC$$iwC.init(console:45)

 at $iwC$$iwC.init(console:47)

 at $iwC.init(console:49)

 at init(console:51)

 at .init(console:55)

 at .clinit(console)

 at .init(console:7)

 at .clinit(console)

 at $print(console)

 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
 Method)

 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

 at java.lang.reflect.Method.invoke(Method.java:606)

 at
 org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:789)

 at
 org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1062)

 at
 org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:615)

 at
 org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:646)

 at
 org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:610)

 at
 org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:814)

 at
 org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:859)

 at
 org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:771)

 at
 org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:616)

 at
 org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:624)

 at
 org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:629)

 at
 org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:954)

 at
 org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902)

 at
 org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902)

 at
 scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)

 at
 org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:902)

 at
 org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:997)

 at org.apache.spark.repl.Main$.main(Main.scala:31)

 at org.apache.spark.repl.Main.main(Main.scala)

 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
 Method)

 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

 at
 

RE: Writing collection to file error

2014-11-24 Thread Saurabh Agrawal

Thanks for your help Akhil, however, this is creating an output folder and 
storing the result sets in multiple files. Also the record count in  the result 
set seems to have multiplied!! Is there any other way to achieve this?

Thanks!!

Regards,
Saurabh Agrawal
Vice President

Markit

Green Boulevard
B-9A, Tower C
3rd Floor, Sector - 62,
Noida 201301, India
+91 120 611 8274 Office

From: Akhil Das [mailto:ak...@sigmoidanalytics.com]
Sent: Monday, November 24, 2014 5:55 PM
To: Saurabh Agrawal
Cc: user@spark.apache.org
Subject: Re: Writing collection to file error

Hi Saurabh,

Here your ratesAndPreds is a RDD of type [((int, int), (Double, Double))] not 
an Array. Now, if you want to save it on disk, then you can simply call the 
saveAsTextFile and provide the location.

So change your last line from this:

ratesAndPreds.foreach(pw.println)

to this

ratesAndPreds.saveAsTextFile(/path/CFOutput)




Thanks
Best Regards

On Mon, Nov 24, 2014 at 5:05 PM, Saurabh Agrawal 
saurabh.agra...@markit.commailto:saurabh.agra...@markit.com wrote:
import org.apache.spark.mllib.recommendation.ALS
import org.apache.spark.mllib.recommendation.Rating

// Load and parse the data
val data = sc.textFile(/path/CFReady.txt)
val ratings = data.map(_.split('\t') match { case Array(user, item, rate) =
Rating(user.toInt, item.toInt, rate.toDouble)
  })

// Build the recommendation model using ALS
val rank = 50
val numIterations = 100
val model = ALS.train(ratings, rank, numIterations, 0.10)

// Evaluate the model on rating data
val usersProducts = ratings.map { case Rating(user, product, rate) =
  (user, product)
}
val predictions =
  model.predict(usersProducts).map { case Rating(user, product, rate) =
((user, product), rate)
  }
val ratesAndPreds = ratings.map { case Rating(user, product, rate) =
  ((user, product), rate)
}.join(predictions)
val MSE = ratesAndPreds.map { case ((user, product), (r1, r2)) =
  val err = (r1 - r2)
  err * err
}.mean()
println(Mean Squared Error =  + MSE)

val pw = new PrintWriter(new File(/path/CFOutput.txt))

ratesAndPreds.foreach(pw.println)
}


Hi,

Consider the highlighted code, I am trying to write the output of ratesAndPreds 
array on to the disk. But I get error

Task not serializable
at 
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at 
org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1242)
at org.apache.spark.rdd.RDD.foreach(RDD.scala:758)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:36)
at $iwC$$iwC$$iwC$$iwC$$iwC.init(console:41)
at $iwC$$iwC$$iwC$$iwC.init(console:43)
at $iwC$$iwC$$iwC.init(console:45)
at $iwC$$iwC.init(console:47)
at $iwC.init(console:49)
at init(console:51)
at .init(console:55)
at .clinit(console)
at .init(console:7)
at .clinit(console)
at $print(console)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:789)
at 
org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1062)
at 
org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:615)
at 
org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:646)
at 
org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:610)
at 
org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:814)
at 
org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:859)
at 
org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:771)
at 
org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:616)
at 
org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:624)
at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:629)
at 
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:954)
at 
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902)
at 
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902)
at 
scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at 
org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:902

Re: Writing collection to file error

2014-11-24 Thread Akhil Das
To get the results in a single file, you could do a repartition(1) and then
save it.

ratesAndPreds.repartition(1).saveAsTextFile(/path/CFOutput)



Thanks
Best Regards

On Mon, Nov 24, 2014 at 8:32 PM, Saurabh Agrawal saurabh.agra...@markit.com
 wrote:



 Thanks for your help Akhil, however, this is creating an output folder and
 storing the result sets in multiple files. Also the record count in  the
 result set seems to have multiplied!! Is there any other way to achieve
 this?



 Thanks!!



 Regards,

 Saurabh Agrawal

 Vice President



 Markit



 Green Boulevard

 B-9A, Tower C

 3rd Floor, Sector - 62,

 Noida 201301, India

 +91 120 611 8274 Office



 *From:* Akhil Das [mailto:ak...@sigmoidanalytics.com]
 *Sent:* Monday, November 24, 2014 5:55 PM
 *To:* Saurabh Agrawal
 *Cc:* user@spark.apache.org
 *Subject:* Re: Writing collection to file error



 Hi Saurabh,



 Here your ratesAndPreds is a RDD of type [((int, int), (Double, Double))]
 not an Array. Now, if you want to save it on disk, then you can simply call
 the saveAsTextFile and provide the location.



 So change your last line from this:



  ratesAndPreds.foreach(pw.println)



 to this



  ratesAndPreds.saveAsTextFile(/path/CFOutput)








   Thanks

 Best Regards



 On Mon, Nov 24, 2014 at 5:05 PM, Saurabh Agrawal 
 saurabh.agra...@markit.com wrote:

 import org.apache.spark.mllib.recommendation.ALS

 import org.apache.spark.mllib.recommendation.Rating



 // Load and parse the data

 val data = sc.textFile(/path/CFReady.txt)

 val ratings = data.map(_.split('\t') match { case Array(user, item, rate)
 =

 Rating(user.toInt, item.toInt, rate.toDouble)

   })



 // Build the recommendation model using ALS

 val rank = 50

 val numIterations = 100

 val model = ALS.train(ratings, rank, numIterations, 0.10)



 // Evaluate the model on rating data

 val usersProducts = ratings.map { case Rating(user, product, rate) =

   (user, product)

 }

 val predictions =

   model.predict(usersProducts).map { case Rating(user, product, rate) =

 ((user, product), rate)

   }

 val ratesAndPreds = ratings.map { case Rating(user, product, rate) =

   ((user, product), rate)

 }.join(predictions)

 val MSE = ratesAndPreds.map { case ((user, product), (r1, r2)) =

   val err = (r1 - r2)

   err * err

 }.mean()

 println(Mean Squared Error =  + MSE)



 val pw = new PrintWriter(new File(/path/CFOutput.txt))



 ratesAndPreds.foreach(pw.println)

 }





 Hi,



 Consider the highlighted code, I am trying to write the output of
 ratesAndPreds array on to the disk. But I get error



 Task not serializable

 at
 org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)

 at
 org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)

 at
 org.apache.spark.SparkContext.clean(SparkContext.scala:1242)

 at org.apache.spark.rdd.RDD.foreach(RDD.scala:758)

 at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:36)

 at $iwC$$iwC$$iwC$$iwC$$iwC.init(console:41)

 at $iwC$$iwC$$iwC$$iwC.init(console:43)

 at $iwC$$iwC$$iwC.init(console:45)

 at $iwC$$iwC.init(console:47)

 at $iwC.init(console:49)

 at init(console:51)

 at .init(console:55)

 at .clinit(console)

 at .init(console:7)

 at .clinit(console)

 at $print(console)

 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
 Method)

 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

 at java.lang.reflect.Method.invoke(Method.java:606)

 at
 org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:789)

 at
 org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1062)

 at
 org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:615)

 at
 org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:646)

 at
 org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:610)

 at
 org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:814)

 at
 org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:859)

 at
 org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:771)

 at
 org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:616)

 at
 org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:624)

 at
 org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:629)

 at
 org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala