I tried something similar and got oration error
I had 10 executors and 10 8 cores


>>> ratings = newrdd.map(lambda l: 
>>> Rating(int(l[1]),int(l[2]),l[4])).partitionBy(50)
>>> mypart = ratings.getNumPartitions()
>>> mypart
50
>>> numIterations =10
>>> rank = 100
>>> model = ALS.trainImplicit(ratings, rank, numIterations)
[Stage 4:>               (0 + 59) / 210][Stage 6:>               (0 + 21) / 210]
[Stage 4:===========>  (169 + 41) / 210][Stage 6:=>             (24 + 39) / 210]
[Stage 6:=============================================>        (178 + 32) / 210]
[Stage 7:>                                                       (0 + 80) / 
200]15/06/26 21:25:11 ERROR TaskSetManager: Task 35 in stage 7.0 failed 4 
times; aborting job
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File 
"/homes/afarahat/aofspark/share/spark/python/pyspark/mllib/recommendation.py", 
line 200, in trainImplicit
    model = callMLlibFunc("trainImplicitALSModel", cls._prepare(ratings), rank,
  File 
"/homes/afarahat/aofspark/share/spark/python/pyspark/mllib/recommendation.py", 
line 181, in _prepare
    first = ratings.first()
  File "/homes/afarahat/aofspark/share/spark/python/pyspark/rdd.py", line 1283, 
in first
    rs = self.take(1)
  File "/homes/afarahat/aofspark/share/spark/python/pyspark/rdd.py", line 1265, 
in take
    res = self.context.runJob(self, takeUpToNumLeft, p, True)
  File "/homes/afarahat/aofspark/share/spark/python/pyspark/context.py", line 
897, in runJob
    allowLocal)
  File 
"/homes/afarahat/aofspark/share/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",
 line 538, in __call__
  File 
"/homes/afarahat/aofspark/share/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py",
 line 300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling 
z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 35 in 
stage 7.0 failed 4 times, most recent failure: Lost task 35.3 in stage 7.0 (TID 
1213, gsbl407n02.blue.ygrid.yahoo.com): 
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File 
"/grid/3/tmp/yarn-local/usercache/afarahat/appcache/application_1433921068880_966242/container_1433921068880_966242_01_000006/pyspark.zip/pyspark/worker.py",
 line 111, in main
    process()
  File 
"/grid/3/tmp/yarn-local/usercache/afarahat/appcache/application_1433921068880_966242/container_1433921068880_966242_01_000006/pyspark.zip/pyspark/worker.py",
 line 106, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File 
"/grid/3/tmp/yarn-local/usercache/afarahat/appcache/application_1433921068880_966242/container_1433921068880_966242_01_000006/pyspark.zip/pyspark/serializers.py",
 line 133, in dump_stream
    for obj in iterator:
  File "/homes/afarahat/aofspark/share/spark/python/pyspark/rdd.py", line 1669, 
in add_shuffle_key
ValueError: too many values to unpack

        at 
org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:138)
        at 
org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:179)
        at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:97)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
        at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:315)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
        at org.apache.spark.scheduler.Task.run(Task.scala:70)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:722)

Driver stacktrace:
        at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266)
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257)
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256)
        at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256)
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
        at scala.Option.foreach(Option.scala:236)
        at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1450)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1411)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
On Jun 26, 2015, at 12:43 PM, Ravi Mody <rmody...@gmail.com> wrote:

> I set the number of partitions on the input dataset at 50. The number of CPU 
> cores I'm using is 84 (7 executors, 12 cores). 
> 
> I'll look into getting a full stack trace. Any idea what my errors mean, and 
> why increasing memory causes them to go away? Thanks. 
> 
> On Fri, Jun 26, 2015 at 11:26 AM, Xiangrui Meng <men...@gmail.com> wrote:
> Please see my comments inline. It would be helpful if you can attach
> the full stack trace. -Xiangrui
> 
> On Fri, Jun 26, 2015 at 7:18 AM, Ravi Mody <rmody...@gmail.com> wrote:
> > 1. These are my settings:
> > rank = 100
> > iterations = 12
> > users = ~20M
> > items = ~2M
> > training examples = ~500M-1B (I'm running into the issue even with 500M
> > training examples)
> >
> 
> Did you set number of blocks? If you didn't, could you check how many
> partitions you have in the ratings RDD? Setting a large number of
> blocks would increase shuffle size. If you have enough RAM, try to set
> number of blocks to the number of CPU cores or less.
> 
> > 2. The memory storage never seems to go too high. The user blocks may go up
> > to ~10Gb, and each executor will have a few GB used out of 30 free GB.
> > Everything seems small compared to the amount of memory I'm using.
> >
> 
> This looks correct.
> 
> > 3. I think I have a lot of disk space - is this on the executors or the
> > driver? Is there a way to know if the error is coming from disk space.
> >
> 
> You can see the shuffle data size for each iteration from the WebUI.
> Usually, it should throw an out of disk space exception instead of the
> message you posted. But it is worth checking.
> 
> > 4. I'm not changing checkpointing settings, but I think checkpointing
> > defaults to every 10 iterations? One notable thing is the crashes often
> > start on or after the 9th iteration, so it may be related to checkpointing.
> > But this could just be a coincidence.
> >
> 
> If you didn't set checkpointDir in SparkContext, the
> checkpointInterval setting in ALS has no effect.
> 
> > Thanks!
> >
> >
> >
> >
> >
> > On Fri, Jun 26, 2015 at 1:08 AM, Ayman Farahat <ayman.fara...@yahoo.com>
> > wrote:
> >>
> >> was there any resolution to that problem?
> >> I am also having that with Pyspark 1.4
> >> 380 Million observations
> >> 100 factors and 5 iterations
> >> Thanks
> >> Ayman
> >>
> >> On Jun 23, 2015, at 6:20 PM, Xiangrui Meng <men...@gmail.com> wrote:
> >>
> >> > It shouldn't be hard to handle 1 billion ratings in 1.3. Just need
> >> > more information to guess what happened:
> >> >
> >> > 1. Could you share the ALS settings, e.g., number of blocks, rank and
> >> > number of iterations, as well as number of users/items in your
> >> > dataset?
> >> > 2. If you monitor the progress in the WebUI, how much data is stored
> >> > in memory and how much data is shuffled per iteration?
> >> > 3. Do you have enough disk space for the shuffle files?
> >> > 4. Did you set checkpointDir in SparkContext and checkpointInterval in
> >> > ALS?
> >> >
> >> > Best,
> >> > Xiangrui
> >> >
> >> > On Fri, Jun 19, 2015 at 11:43 AM, Ravi Mody <rmody...@gmail.com> wrote:
> >> >> Hi, I'm running implicit matrix factorization/ALS in Spark 1.3.1 on
> >> >> fairly
> >> >> large datasets (1+ billion input records). As I grow my dataset I often
> >> >> run
> >> >> into issues with a lot of failed stages and dropped executors,
> >> >> ultimately
> >> >> leading to the whole application failing. The errors are like
> >> >> "org.apache.spark.shuffle.MetadataFetchFailedException: Missing an
> >> >> output
> >> >> location for shuffle 19" and
> >> >> "org.apache.spark.shuffle.FetchFailedException:
> >> >> Failed to connect to...". These occur during flatMap, mapPartitions,
> >> >> and
> >> >> aggregate stages. I know that increasing memory fixes this issue, but
> >> >> most
> >> >> of the time my executors are only using a tiny portion of the their
> >> >> allocated memory (<10%). Often, the stages run fine until the last
> >> >> iteration
> >> >> or two of ALS, but this could just be a coincidence.
> >> >>
> >> >> I've tried tweaking a lot of settings, but it's time-consuming to do
> >> >> this
> >> >> through guess-and-check. Right now I have these set:
> >> >> spark.shuffle.memoryFraction = 0.3
> >> >> spark.storage.memoryFraction = 0.65
> >> >> spark.executor.heartbeatInterval = 600000
> >> >>
> >> >> I'm sure these settings aren't optimal - any idea of what could be
> >> >> causing
> >> >> my errors, and what direction I can push these settings in to get more
> >> >> out
> >> >> of my memory? I'm currently using 240 GB of memory (on 7 executors) for
> >> >> a 1
> >> >> billion record dataset, which seems like too much. Thanks!
> >> >
> >> > ---------------------------------------------------------------------
> >> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> >> > For additional commands, e-mail: user-h...@spark.apache.org
> >> >
> >>
> >
> 

Reply via email to