Hi, all  

I ‘m trying the ALS in mllib

the following is my code

val result = als.run(ratingRDD)
    val allMovies = ratingRDD.map(rating => rating.product).distinct()
    val allUsers = ratingRDD.map(rating => rating.user).distinct()
    val allUserMoviePair = allUsers.cartesian(allMovies)
    val resultRDD = allUserMoviePair.map(userMoviePair => {
      var str = ""
      str += (userMoviePair._1 + ","  + userMoviePair._2 + "," +
        result.predict(userMoviePair._1, userMoviePair._2)) + "\n"
      str
    })



every time result.predict throws exception like  

scala.MatchError: null at 
org.apache.spark.rdd.PairRDDFunctions.lookup(PairRDDFunctions.scala:507) at 
org.apache.spark.mllib.recommendation.MatrixFactorizationModel.predict(MatrixFactorizationModel.scala:42)
 at algorithm.SparkALS$$anonfun$23.apply(SparkALS.scala:72) at 
algorithm.SparkALS$$anonfun$23.apply(SparkALS.scala:69) at 
scala.collection.Iterator$$anon$19.next(Iterator.scala:401) at 
scala.collection.Iterator$$anon$19.next(Iterator.scala:401) at 
org.apache.spark.rdd.PairRDDFunctions.writeToFile$1(PairRDDFunctions.scala:677) 
at 
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$2.apply(PairRDDFunctions.scala:686)
 at 
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$2.apply(PairRDDFunctions.scala:686)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:107) at 
org.apache.spark.scheduler.Task.run(Task.scala:53) at 
org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:215)
 at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:50) 
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:182) at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
at java.lang.Thread.run(Thread.java:724)  

if I change the code to pull the partitions into an array in the driver 
program, it works

val resultRDD = allUserMoviePair.collect().map(userMoviePair => {
      var str = ""
      str += (userMoviePair._1 + ","  + userMoviePair._2 + "," +
        result.predict(userMoviePair._1, userMoviePair._2)) + "\n"
      str
    })


so the exception seems to be related to how to share the 
MatrixFactorizationModel  in each partition?

can anyone give me the hint

Thank you very much!

--  
Nan Zhu

Reply via email to