Sorry, you actually can’t call predict() on the cluster because the model 
contains some RDDs. There was a recent patch that added a parallel predict 
method, here: https://github.com/apache/incubator-spark/pull/328/files. You can 
grab the code from that method there (which does a join) and call that yourself 
in Spark 0.8.x.

Matei

On Jan 7, 2014, at 10:23 PM, Nan Zhu <zhunanmcg...@gmail.com> wrote:

> Hi, all
> 
> I ‘m trying the ALS in mllib
> 
> the following is my code
> 
> val result = als.run(ratingRDD)
>     val allMovies = ratingRDD.map(rating => rating.product).distinct()
>     val allUsers = ratingRDD.map(rating => rating.user).distinct()
>     val allUserMoviePair = allUsers.cartesian(allMovies)
>     val resultRDD = allUserMoviePair.map(userMoviePair => {
>       var str = ""
>       str += (userMoviePair._1 + ","  + userMoviePair._2 + "," +
>         result.predict(userMoviePair._1, userMoviePair._2)) + "\n"
>       str
>     })
> 
> 
> every time result.predict throws exception like 
> 
> scala.MatchError: null
>       at 
> org.apache.spark.rdd.PairRDDFunctions.lookup(PairRDDFunctions.scala:507)
>       at 
> org.apache.spark.mllib.recommendation.MatrixFactorizationModel.predict(MatrixFactorizationModel.scala:42)
>       at algorithm.SparkALS$$anonfun$23.apply(SparkALS.scala:72)
>       at algorithm.SparkALS$$anonfun$23.apply(SparkALS.scala:69)
>       at scala.collection.Iterator$$anon$19.next(Iterator.scala:401)
>       at scala.collection.Iterator$$anon$19.next(Iterator.scala:401)
>       at 
> org.apache.spark.rdd.PairRDDFunctions.writeToFile$1(PairRDDFunctions.scala:677)
>       at 
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$2.apply(PairRDDFunctions.scala:686)
>       at 
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$2.apply(PairRDDFunctions.scala:686)
>       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:107)
>       at org.apache.spark.scheduler.Task.run(Task.scala:53)
>       at 
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:215)
>       at 
> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:50)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:182)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>       at java.lang.Thread.run(Thread.java:724)
> 
> if I change the code to pull the partitions into an array in the driver 
> program, it works
> 
> val resultRDD = allUserMoviePair.collect().map(userMoviePair => {
>       var str = ""
>       str += (userMoviePair._1 + ","  + userMoviePair._2 + "," +
>         result.predict(userMoviePair._1, userMoviePair._2)) + "\n"
>       str
>     })
> 
> so the exception seems to be related to how to share the 
> MatrixFactorizationModel  in each partition?
> 
> can anyone give me the hint
> 
> Thank you very much!
> 
> -- 
> Nan Zhu
> 

Reply via email to