Sorry, you actually can’t call predict() on the cluster because the model contains some RDDs. There was a recent patch that added a parallel predict method, here: https://github.com/apache/incubator-spark/pull/328/files. You can grab the code from that method there (which does a join) and call that yourself in Spark 0.8.x.
Matei On Jan 7, 2014, at 10:23 PM, Nan Zhu <zhunanmcg...@gmail.com> wrote: > Hi, all > > I ‘m trying the ALS in mllib > > the following is my code > > val result = als.run(ratingRDD) > val allMovies = ratingRDD.map(rating => rating.product).distinct() > val allUsers = ratingRDD.map(rating => rating.user).distinct() > val allUserMoviePair = allUsers.cartesian(allMovies) > val resultRDD = allUserMoviePair.map(userMoviePair => { > var str = "" > str += (userMoviePair._1 + "," + userMoviePair._2 + "," + > result.predict(userMoviePair._1, userMoviePair._2)) + "\n" > str > }) > > > every time result.predict throws exception like > > scala.MatchError: null > at > org.apache.spark.rdd.PairRDDFunctions.lookup(PairRDDFunctions.scala:507) > at > org.apache.spark.mllib.recommendation.MatrixFactorizationModel.predict(MatrixFactorizationModel.scala:42) > at algorithm.SparkALS$$anonfun$23.apply(SparkALS.scala:72) > at algorithm.SparkALS$$anonfun$23.apply(SparkALS.scala:69) > at scala.collection.Iterator$$anon$19.next(Iterator.scala:401) > at scala.collection.Iterator$$anon$19.next(Iterator.scala:401) > at > org.apache.spark.rdd.PairRDDFunctions.writeToFile$1(PairRDDFunctions.scala:677) > at > org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$2.apply(PairRDDFunctions.scala:686) > at > org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$2.apply(PairRDDFunctions.scala:686) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:107) > at org.apache.spark.scheduler.Task.run(Task.scala:53) > at > org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:215) > at > org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:50) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:182) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:724) > > if I change the code to pull the partitions into an array in the driver > program, it works > > val resultRDD = allUserMoviePair.collect().map(userMoviePair => { > var str = "" > str += (userMoviePair._1 + "," + userMoviePair._2 + "," + > result.predict(userMoviePair._1, userMoviePair._2)) + "\n" > str > }) > > so the exception seems to be related to how to share the > MatrixFactorizationModel in each partition? > > can anyone give me the hint > > Thank you very much! > > -- > Nan Zhu >