Hi, all
I ‘m trying the ALS in mllib
the following is my code
val result = als.run(ratingRDD)
val allMovies = ratingRDD.map(rating => rating.product).distinct()
val allUsers = ratingRDD.map(rating => rating.user).distinct()
val allUserMoviePair = allUsers.cartesian(allMovies)
val resultRDD = allUserMoviePair.map(userMoviePair => {
var str = ""
str += (userMoviePair._1 + "," + userMoviePair._2 + "," +
result.predict(userMoviePair._1, userMoviePair._2)) + "\n"
str
})
every time result.predict throws exception like
scala.MatchError: null at
org.apache.spark.rdd.PairRDDFunctions.lookup(PairRDDFunctions.scala:507) at
org.apache.spark.mllib.recommendation.MatrixFactorizationModel.predict(MatrixFactorizationModel.scala:42)
at algorithm.SparkALS$$anonfun$23.apply(SparkALS.scala:72) at
algorithm.SparkALS$$anonfun$23.apply(SparkALS.scala:69) at
scala.collection.Iterator$$anon$19.next(Iterator.scala:401) at
scala.collection.Iterator$$anon$19.next(Iterator.scala:401) at
org.apache.spark.rdd.PairRDDFunctions.writeToFile$1(PairRDDFunctions.scala:677)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$2.apply(PairRDDFunctions.scala:686)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$2.apply(PairRDDFunctions.scala:686)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:107) at
org.apache.spark.scheduler.Task.run(Task.scala:53) at
org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:215)
at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:50)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:182) at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:724)
if I change the code to pull the partitions into an array in the driver
program, it works
val resultRDD = allUserMoviePair.collect().map(userMoviePair => {
var str = ""
str += (userMoviePair._1 + "," + userMoviePair._2 + "," +
result.predict(userMoviePair._1, userMoviePair._2)) + "\n"
str
})
so the exception seems to be related to how to share the
MatrixFactorizationModel in each partition?
can anyone give me the hint
Thank you very much!
--
Nan Zhu