Davies,

That was it. Removing the call to cache() let the job run successfully, but
this challenges my understanding of how Spark handles caching data.

I thought it was safe to cache data sets larger than the cluster could hold
in memory. What Spark would do is cache as much as it could and leave the
rest for access from disk.

Is that not correct?

Nick

On Thu, Jul 31, 2014 at 5:04 PM, Davies Liu <dav...@databricks.com> wrote:

Maybe because you try to cache all the data in memory, but heap of JVM
> is not big enough.
>
> If remove the .cache(), is there still this problem?
>
> On Thu, Jul 31, 2014 at 1:33 PM, Nicholas Chammas
> <nicholas.cham...@gmail.com> wrote:
> > Hmm, looking at this stack trace a bit more carefully, it looks like the
> > code in the Hadoop API for reading data from the source choked. Is that
> > correct?
> >
> > Perhaps, there is a missing newline (or two. or more) that make 1 line of
> > data too much to read in at once? I'm just guessing here. Gonna try to
> track
> > this down real quick.
> >
> > Btw, I'm seeing this on 1.0.1 as well, so it's not a regression in
> 1.0.2-rc1
> > or anything like that.
> >
> > Nick
> >
> >
> > On Thu, Jul 31, 2014 at 4:18 PM, Nicholas Chammas
> > <nicholas.cham...@gmail.com> wrote:
> >>
> >> So if I try this again but in the Scala shell (as opposed to the Python
> >> one), this is what I get:
> >>
> >> scala> val a = sc.textFile("s3n://some-path/*.json",
> >> minPartitions=sc.defaultParallelism * 3).cache()
> >> a: org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at
> >> <console>:12
> >>
> >> scala> a.map(_.length).max
> >> 14/07/31 20:09:04 WARN LoadSnappy: Snappy native library is available
> >> 14/07/31 20:10:41 WARN TaskSetManager: Lost TID 22 (task 0.0:22)
> >> 14/07/31 20:10:41 WARN TaskSetManager: Loss was due to
> >> java.lang.OutOfMemoryError
> >> java.lang.OutOfMemoryError: GC overhead limit exceeded
> >>     at java.util.Arrays.copyOfRange(Arrays.java:2694)
> >>     at java.lang.String.<init>(String.java:203)
> >>     at java.nio.HeapCharBuffer.toString(HeapCharBuffer.java:561)
> >>     at java.nio.CharBuffer.toString(CharBuffer.java:1201)
> >>     at org.apache.hadoop.io.Text.decode(Text.java:350)
> >>     at org.apache.hadoop.io.Text.decode(Text.java:327)
> >>     at org.apache.hadoop.io.Text.toString(Text.java:254)
> >>     at
> >>
> org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:458)
> >>     at
> >>
> org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:458)
> >>     at scala.collection.Iterator$anon$11.next(Iterator.scala:328)
> >>     at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> >>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> >>     at
> >> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
> >>     at
> >>
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
> >>     at
> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:107)
> >>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:227)
> >>     at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
> >>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
> >>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
> >>     at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
> >>     at org.apache.spark.scheduler.Task.run(Task.scala:51)
> >>     at
> >> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183)
> >>     at
> >>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> >>     at
> >>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> >>     at java.lang.Thread.run(Thread.java:745)
> >> 14/07/31 20:10:42 ERROR TaskSchedulerImpl: Lost executor 19 on
> >> ip-10-13-142-142.ec2.internal: OutOfMemoryError
> >>
> >> So I guess I need to fiddle with some memory configs? I’m surprised that
> >> just checking input line length could trigger this.
> >>
> >> Nick
> >>
> >>
> >>
> >> On Wed, Jul 30, 2014 at 8:58 PM, Davies Liu <dav...@databricks.com>
> wrote:
> >>>
> >>> The exception in Python means that the worker try to read command from
> >>> JVM, but it reach
> >>> the end of socket (socket had been closed). So it's possible that
> >>> there another exception
> >>> happened in JVM.
> >>>
> >>> Could you change the log level of log4j, then check is there any
> >>> problem inside JVM?
> >>>
> >>> Davies
> >>>
> >>> On Wed, Jul 30, 2014 at 9:12 AM, Nicholas Chammas
> >>> <nicholas.cham...@gmail.com> wrote:
> >>> > Any clues? This looks like a bug, but I can't report it without more
> >>> > precise
> >>> > information.
> >>> >
> >>> >
> >>> > On Tue, Jul 29, 2014 at 9:56 PM, Nick Chammas
> >>> > <nicholas.cham...@gmail.com>
> >>> > wrote:
> >>> >>
> >>> >> I’m in the PySpark shell and I’m trying to do this:
> >>> >>
> >>> >> a =
> >>> >>
> >>> >>
> sc.textFile('s3n://path-to-handful-of-very-large-files-totalling-1tb/*.json',
> >>> >> minPartitions=sc.defaultParallelism * 3).cache()
> >>> >> a.map(lambda x: len(x)).max()
> >>> >>
> >>> >> My job dies with the following:
> >>> >>
> >>> >> 14/07/30 01:46:28 WARN TaskSetManager: Loss was due to
> >>> >> org.apache.spark.api.python.PythonException
> >>> >> org.apache.spark.api.python.PythonException: Traceback (most recent
> >>> >> call
> >>> >> last):
> >>> >>   File "/root/spark/python/pyspark/worker.py", line 73, in main
> >>> >>     command = pickleSer._read_with_length(infile)
> >>> >>   File "/root/spark/python/pyspark/serializers.py", line 142, in
> >>> >> _read_with_length
> >>> >>     length = read_int(stream)
> >>> >>   File "/root/spark/python/pyspark/serializers.py", line 337, in
> >>> >> read_int
> >>> >>     raise EOFError
> >>> >> EOFError
> >>> >>
> >>> >>     at
> >>> >>
> >>> >>
> org.apache.spark.api.python.PythonRDD$anon$1.read(PythonRDD.scala:115)
> >>> >>     at
> >>> >>
> >>> >>
> org.apache.spark.api.python.PythonRDD$anon$1.<init>(PythonRDD.scala:145)
> >>> >>     at
> >>> >> org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:78)
> >>> >>     at
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
> >>> >>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
> >>> >>     at
> >>> >> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
> >>> >>     at org.apache.spark.scheduler.Task.run(Task.scala:51)
> >>> >>     at
> >>> >>
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183)
> >>> >>     at
> >>> >>
> >>> >>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> >>> >>     at
> >>> >>
> >>> >>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> >>> >>     at java.lang.Thread.run(Thread.java:745)
> >>> >> 14/07/30 01:46:29 ERROR TaskSchedulerImpl: Lost executor 19 on
> >>> >> ip-10-190-171-217.ec2.internal: remote Akka client disassociated
> >>> >>
> >>> >> How do I debug this? I’m using 1.0.2-rc1 deployed to EC2.
> >>> >>
> >>> >> Nick
> >>> >>
> >>> >>
> >>> >> ________________________________
> >>> >> View this message in context: How do you debug a PythonException?
> >>> >> Sent from the Apache Spark User List mailing list archive at
> >>> >> Nabble.com.
> >>> >
> >>> >
> >>
> >>
> >
>
​

Reply via email to