great  

thank you Matei  

--  
Nan Zhu



On Wednesday, January 8, 2014 at 12:33 AM, Matei Zaharia wrote:

> Sorry, you actually can’t call predict() on the cluster because the model 
> contains some RDDs. There was a recent patch that added a parallel predict 
> method, here: https://github.com/apache/incubator-spark/pull/328/files. You 
> can grab the code from that method there (which does a join) and call that 
> yourself in Spark 0.8.x.
>  
> Matei
>  
> On Jan 7, 2014, at 10:23 PM, Nan Zhu <[email protected] 
> (mailto:[email protected])> wrote:
> > Hi, all  
> >  
> > I ‘m trying the ALS in mllib
> >  
> > the following is my code
> >  
> > val result = als.run(ratingRDD)
> >     val allMovies = ratingRDD.map(rating => rating.product).distinct()
> >     val allUsers = ratingRDD.map(rating => rating.user).distinct()
> >     val allUserMoviePair = allUsers.cartesian(allMovies)
> >     val resultRDD = allUserMoviePair.map(userMoviePair => {
> >       var str = ""
> >       str += (userMoviePair._1 + ","  + userMoviePair._2 + "," +
> >         result.predict(userMoviePair._1, userMoviePair._2)) + "\n"
> >       str
> >     })
> >  
> >  
> >  
> > every time result.predict throws exception like  
> >  
> > scala.MatchError: null at 
> > org.apache.spark.rdd.PairRDDFunctions.lookup(PairRDDFunctions.scala:507) at 
> > org.apache.spark.mllib.recommendation.MatrixFactorizationModel.predict(MatrixFactorizationModel.scala:42)
> >  at algorithm.SparkALS$$anonfun$23.apply(SparkALS.scala:72) at 
> > algorithm.SparkALS$$anonfun$23.apply(SparkALS.scala:69) at 
> > scala.collection.Iterator$$anon$19.next(Iterator.scala:401) at 
> > scala.collection.Iterator$$anon$19.next(Iterator.scala:401) at 
> > org.apache.spark.rdd.PairRDDFunctions.writeToFile$1(PairRDDFunctions.scala:677)
> >  at 
> > org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$2.apply(PairRDDFunctions.scala:686)
> >  at 
> > org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$2.apply(PairRDDFunctions.scala:686)
> >  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:107) at 
> > org.apache.spark.scheduler.Task.run(Task.scala:53) at 
> > org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:215)
> >  at 
> > org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:50) 
> > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:182) at 
> > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> >  at 
> > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> >  at java.lang.Thread.run(Thread.java:724)  
> >  
> > if I change the code to pull the partitions into an array in the driver 
> > program, it works
> >  
> > val resultRDD = allUserMoviePair.collect().map(userMoviePair => {
> >       var str = ""
> >       str += (userMoviePair._1 + ","  + userMoviePair._2 + "," +
> >         result.predict(userMoviePair._1, userMoviePair._2)) + "\n"
> >       str
> >     })
> >  
> >  
> > so the exception seems to be related to how to share the 
> > MatrixFactorizationModel  in each partition?
> >  
> > can anyone give me the hint
> >  
> > Thank you very much!
> >  
> > --  
> > Nan Zhu
> >  
>  

Reply via email to