I tried something similar and got oration error I had 10 executors and 10 8 cores
>>> ratings = newrdd.map(lambda l: >>> Rating(int(l[1]),int(l[2]),l[4])).partitionBy(50) >>> mypart = ratings.getNumPartitions() >>> mypart 50 >>> numIterations =10 >>> rank = 100 >>> model = ALS.trainImplicit(ratings, rank, numIterations) [Stage 4:> (0 + 59) / 210][Stage 6:> (0 + 21) / 210] [Stage 4:===========> (169 + 41) / 210][Stage 6:=> (24 + 39) / 210] [Stage 6:=============================================> (178 + 32) / 210] [Stage 7:> (0 + 80) / 200]15/06/26 21:25:11 ERROR TaskSetManager: Task 35 in stage 7.0 failed 4 times; aborting job Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/homes/afarahat/aofspark/share/spark/python/pyspark/mllib/recommendation.py", line 200, in trainImplicit model = callMLlibFunc("trainImplicitALSModel", cls._prepare(ratings), rank, File "/homes/afarahat/aofspark/share/spark/python/pyspark/mllib/recommendation.py", line 181, in _prepare first = ratings.first() File "/homes/afarahat/aofspark/share/spark/python/pyspark/rdd.py", line 1283, in first rs = self.take(1) File "/homes/afarahat/aofspark/share/spark/python/pyspark/rdd.py", line 1265, in take res = self.context.runJob(self, takeUpToNumLeft, p, True) File "/homes/afarahat/aofspark/share/spark/python/pyspark/context.py", line 897, in runJob allowLocal) File "/homes/afarahat/aofspark/share/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__ File "/homes/afarahat/aofspark/share/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 35 in stage 7.0 failed 4 times, most recent failure: Lost task 35.3 in stage 7.0 (TID 1213, gsbl407n02.blue.ygrid.yahoo.com): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/grid/3/tmp/yarn-local/usercache/afarahat/appcache/application_1433921068880_966242/container_1433921068880_966242_01_000006/pyspark.zip/pyspark/worker.py", line 111, in main process() File "/grid/3/tmp/yarn-local/usercache/afarahat/appcache/application_1433921068880_966242/container_1433921068880_966242_01_000006/pyspark.zip/pyspark/worker.py", line 106, in process serializer.dump_stream(func(split_index, iterator), outfile) File "/grid/3/tmp/yarn-local/usercache/afarahat/appcache/application_1433921068880_966242/container_1433921068880_966242_01_000006/pyspark.zip/pyspark/serializers.py", line 133, in dump_stream for obj in iterator: File "/homes/afarahat/aofspark/share/spark/python/pyspark/rdd.py", line 1669, in add_shuffle_key ValueError: too many values to unpack at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:138) at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:179) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:97) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:315) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:70) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:722) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1450) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1411) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) On Jun 26, 2015, at 12:43 PM, Ravi Mody <rmody...@gmail.com> wrote: > I set the number of partitions on the input dataset at 50. The number of CPU > cores I'm using is 84 (7 executors, 12 cores). > > I'll look into getting a full stack trace. Any idea what my errors mean, and > why increasing memory causes them to go away? Thanks. > > On Fri, Jun 26, 2015 at 11:26 AM, Xiangrui Meng <men...@gmail.com> wrote: > Please see my comments inline. It would be helpful if you can attach > the full stack trace. -Xiangrui > > On Fri, Jun 26, 2015 at 7:18 AM, Ravi Mody <rmody...@gmail.com> wrote: > > 1. These are my settings: > > rank = 100 > > iterations = 12 > > users = ~20M > > items = ~2M > > training examples = ~500M-1B (I'm running into the issue even with 500M > > training examples) > > > > Did you set number of blocks? If you didn't, could you check how many > partitions you have in the ratings RDD? Setting a large number of > blocks would increase shuffle size. If you have enough RAM, try to set > number of blocks to the number of CPU cores or less. > > > 2. The memory storage never seems to go too high. The user blocks may go up > > to ~10Gb, and each executor will have a few GB used out of 30 free GB. > > Everything seems small compared to the amount of memory I'm using. > > > > This looks correct. > > > 3. I think I have a lot of disk space - is this on the executors or the > > driver? Is there a way to know if the error is coming from disk space. > > > > You can see the shuffle data size for each iteration from the WebUI. > Usually, it should throw an out of disk space exception instead of the > message you posted. But it is worth checking. > > > 4. I'm not changing checkpointing settings, but I think checkpointing > > defaults to every 10 iterations? One notable thing is the crashes often > > start on or after the 9th iteration, so it may be related to checkpointing. > > But this could just be a coincidence. > > > > If you didn't set checkpointDir in SparkContext, the > checkpointInterval setting in ALS has no effect. > > > Thanks! > > > > > > > > > > > > On Fri, Jun 26, 2015 at 1:08 AM, Ayman Farahat <ayman.fara...@yahoo.com> > > wrote: > >> > >> was there any resolution to that problem? > >> I am also having that with Pyspark 1.4 > >> 380 Million observations > >> 100 factors and 5 iterations > >> Thanks > >> Ayman > >> > >> On Jun 23, 2015, at 6:20 PM, Xiangrui Meng <men...@gmail.com> wrote: > >> > >> > It shouldn't be hard to handle 1 billion ratings in 1.3. Just need > >> > more information to guess what happened: > >> > > >> > 1. Could you share the ALS settings, e.g., number of blocks, rank and > >> > number of iterations, as well as number of users/items in your > >> > dataset? > >> > 2. If you monitor the progress in the WebUI, how much data is stored > >> > in memory and how much data is shuffled per iteration? > >> > 3. Do you have enough disk space for the shuffle files? > >> > 4. Did you set checkpointDir in SparkContext and checkpointInterval in > >> > ALS? > >> > > >> > Best, > >> > Xiangrui > >> > > >> > On Fri, Jun 19, 2015 at 11:43 AM, Ravi Mody <rmody...@gmail.com> wrote: > >> >> Hi, I'm running implicit matrix factorization/ALS in Spark 1.3.1 on > >> >> fairly > >> >> large datasets (1+ billion input records). As I grow my dataset I often > >> >> run > >> >> into issues with a lot of failed stages and dropped executors, > >> >> ultimately > >> >> leading to the whole application failing. The errors are like > >> >> "org.apache.spark.shuffle.MetadataFetchFailedException: Missing an > >> >> output > >> >> location for shuffle 19" and > >> >> "org.apache.spark.shuffle.FetchFailedException: > >> >> Failed to connect to...". These occur during flatMap, mapPartitions, > >> >> and > >> >> aggregate stages. I know that increasing memory fixes this issue, but > >> >> most > >> >> of the time my executors are only using a tiny portion of the their > >> >> allocated memory (<10%). Often, the stages run fine until the last > >> >> iteration > >> >> or two of ALS, but this could just be a coincidence. > >> >> > >> >> I've tried tweaking a lot of settings, but it's time-consuming to do > >> >> this > >> >> through guess-and-check. Right now I have these set: > >> >> spark.shuffle.memoryFraction = 0.3 > >> >> spark.storage.memoryFraction = 0.65 > >> >> spark.executor.heartbeatInterval = 600000 > >> >> > >> >> I'm sure these settings aren't optimal - any idea of what could be > >> >> causing > >> >> my errors, and what direction I can push these settings in to get more > >> >> out > >> >> of my memory? I'm currently using 240 GB of memory (on 7 executors) for > >> >> a 1 > >> >> billion record dataset, which seems like too much. Thanks! > >> > > >> > --------------------------------------------------------------------- > >> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > >> > For additional commands, e-mail: user-h...@spark.apache.org > >> > > >> > > >