Looking at the behavior of the program, and reading the code, I think that Spark does the flatmap of all of the elements in it's local partition, then serializes the results altogether at the end. At that point it realizes it doesn't have enough memory, so it starts a store to disk, but by that point, too much memory has been consumed and the serialization starts causes memory errors. I tried to pass spark iterators that would parse the files on the fly, but it seems like spark expands them into full ArrayBuffers so it can figure out how much data its working with prior to storing it onto disk.
Any ideas about how to approach the problem? Kyle On Mon, Oct 14, 2013 at 12:40 PM, Kyle Ellrott <[email protected]>wrote: > During the load of the input path RDD, I set the parallelization to the > size of the input array (and to the size/10). That is about 600 and 6000 > tasks for the initial mapping procedure working on about 100 cpus. > > Kyle > > > On Sun, Oct 13, 2013 at 12:15 PM, Ashish Rangole <[email protected]>wrote: > >> Hi Kyle, >> >> Have you tried increasing the default parallelization to say 3 x >> NumCores or more? >> >> Ashish >> On Oct 13, 2013 12:49 AM, "Kyle Ellrott" <[email protected]> wrote: >> >>> I'm working on a program that takes an RDD of file names and runs a >>> flatMap operation on the loading function to produce an RDD of loaded >>> values. If I take that RDD and then call saveAsHadoopFile, the program >>> works fine. However, I need to do a reduceByKey, and the total amount of >>> data is larger then the available memory in the cluster, so I started >>> getting JavaHeap errors and GC overhead errors. That was expected, and I >>> knew the next step would be to run persist with one of the DISK options, >>> but I kept getting memory errors. >>> I've simplified the problem, just trying to run persist before running >>> saveAsHadoopFile (skipping the reduceByKey), and I still get memory errors. >>> I've tried MEMORY_AND_DISK and DISK_ONLY, and still get the memory errors. >>> I've tried setting spark.executor.memory=2g and >>> spark.storage.memoryFraction=0.25, no dice. Switching to >>> 'org.apache.spark.serializer.KryoSerializer' doesn't help either. >>> >>> >>> TL;DR >>> >>> (spark.executor.memory = 512m) >>> myInputs.flatMap( readFile(_) ).saveAsHadoopFile( ... ) : Works fine >>> >>> (spark.executor.memory = 2g) >>> myInputs.flatMap( readFile(_) ).persist(MEMORY_AND_DISK).saveAsHadoopFile( >>> ... ) : Lots of memory java.lang.OutOfMemoryError exceptions (example >>> below). >>> >>> Any ideas of things I could try? >>> >>> >>> Kyle >>> >>> >>> Typical error: >>> >>> java.lang.OutOfMemoryError: GC overhead limit exceeded >>> at >>> java.io.ObjectOutputStream$HandleTable.growSpine(ObjectOutputStream.java:2295) >>> at >>> java.io.ObjectOutputStream$HandleTable.assign(ObjectOutputStream.java:2240) >>> at java.io.ObjectOutputStream.writeString(ObjectOutputStream.java:1262) >>> at >>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1144) >>> at >>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1509) >>> at >>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1474) >>> at >>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1392) >>> at >>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1150) >>> at >>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1509) >>> at >>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1474) >>> at >>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1392) >>> at >>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1150) >>> at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1338) >>> at >>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1146) >>> at >>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1509) >>> at >>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1474) >>> at >>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1392) >>> at >>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1150) >>> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:326) >>> at >>> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:27) >>> at >>> org.apache.spark.serializer.SerializationStream$class.writeAll(Serializer.scala:80) >>> at >>> org.apache.spark.serializer.JavaSerializationStream.writeAll(JavaSerializer.scala:25) >>> at org.apache.spark.storage.DiskStore.putValues(DiskStore.scala:178) >>> at >>> org.apache.spark.storage.BlockManager.liftedTree1$1(BlockManager.scala:618) >>> at org.apache.spark.storage.BlockManager.put(BlockManager.scala:604) >>> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:75) >>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:224) >>> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29) >>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237) >>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:226) >>> at org.apache.spark.scheduler.ResultTask.run(ResultTask.scala:99) >>> at >>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:158) >>> >>> >
