Hello,

I am currently using MLlib ALS to process a large volume of data, about 1.2
billion Rating(userId, productId, rates) triples. The dataset was sepatated
into 4000 partitions for parallized computation on our yarn clusters. 

I encountered this error "Errors communicating with MapOutputTracker",
when trying to get the prediciton rates [model.predict(userproducts)] after
iterations.

    val predictions = model.predict(usersProducts).map{
      case Rating(user, product, rate) => ((user, product), rate)
    }

I tried to separate the iteration process and the process of culating
prediction rates value by storing the two feature matirces into file system
first; and the loading them for prediction. This time, the error occurred at
the stage of loading userFeatures. 

userfData: userId:[0.3,0.5,0.002,.....]

    val userfTuple =userfData.map{
      case (line) => {
        val arr = line.split(splitmark_1)
        val featureArr = arr(1).split(splitmark_2)
        (arr(0),featureArr)
      }
    }

Here is part of the log:
----------------------------------------------------------------------------------------------------------------------------------------------------------------

14-05-21 14:37:17 WARN [Result resolver thread-0] TaskSetManager: Loss was
due to org.apache.spark.SparkException
org.apache.spark.SparkException: Error communicating with MapOutputTracker
        at
org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:79)
        at
org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:126)
        at
org.apache.spark.BlockStoreShuffleFetcher.fetch(BlockStoreShuffleFetcher.scala:43)
        at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:61)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:244)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:235)
        at
org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:90)
        at
org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:89)
        at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
        at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:727)
        at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:723)
        at
org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:884)
        at
org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:884)
        at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
        at org.apache.spark.scheduler.Task.run(Task.scala:53)
        at
org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:220)
        at
org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49)
        at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:184)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
        at java.lang.Thread.run(Thread.java:662)


-------------------------------------------------------------------------------------------------------------------------------------------------------------------

I have tried several methods to solve this problem, one way was to decrease
the number of partitions(from 4000 to 3000), another was to increase the
memory of masters. Both worked, but it is still vital to track the
underneath causes there, right? 

Could anyone help me to check this problem? Thanks a lot. 

Sue Cai



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/MLlib-ALS-Errors-communicating-with-MapOutputTracker-tp6160.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Reply via email to