Hi Ocatavian, Just out of curiosity, did you try persisting your RDD in serialized format "MEMORY_AND_DISK_SER" or "MEMORY_ONLY_SER" ?? i.e. changing your : "rdd.persist(MEMORY_AND_DISK)" to "rdd.persist(MEMORY_ONLY_SER)"
Regards On Wed, Jun 10, 2015 at 7:27 AM, Imran Rashid <iras...@cloudera.com> wrote: > I agree with Richard. It looks like the issue here is shuffling, and > shuffle data is always written to disk, so the issue is definitely not that > all the output of flatMap has to be stored in memory. > > If at all possible, I'd first suggest upgrading to a new version of spark > -- even in 1.2, there were big improvements to shuffle with sort based > shuffle as the default. > > On Tue, Jun 2, 2015 at 1:09 PM, Richard Marscher <rmarsc...@localytics.com > > wrote: > >> Are you sure it's memory related? What is the disk utilization and IO >> performance on the workers? The error you posted looks to be related to >> shuffle trying to obtain block data from another worker node and failing to >> do so in reasonable amount of time. It may still be memory related, but I'm >> not sure that other resources are ruled out yet. >> >> On Tue, Jun 2, 2015 at 5:10 AM, octavian.ganea < >> octavian.ga...@inf.ethz.ch> wrote: >> >>> I was tried using reduceByKey, without success. >>> >>> I also tried this: rdd.persist(MEMORY_AND_DISK).flatMap(...).reduceByKey >>> . >>> However, I got the same error as before, namely the error described here: >>> >>> http://apache-spark-user-list.1001560.n3.nabble.com/flatMap-output-on-disk-flatMap-memory-overhead-td23098.html >>> >>> My task is to count the frequencies of pairs of words that occur in a >>> set of >>> documents at least 5 times. I know that this final output is sparse and >>> should comfortably fit in memory. However, the intermediate pairs that >>> are >>> spilled by flatMap might need to be stored on the disk, but I don't >>> understand why the persist option does not work and my job fails. >>> >>> My code: >>> >>> rdd.persist(StorageLevel.MEMORY_AND_DISK) >>> .flatMap(x => outputPairsOfWords(x)) // outputs pairs of type >>> ((word1,word2) , 1) >>> .reduceByKey((a,b) => (a + b).toShort) >>> .filter({case((x,y),count) => count >= 5}) >>> >>> >>> My cluster has 8 nodes, each with 129 GB of RAM and 16 cores per node. >>> One >>> node I keep for the master, 7 nodes for the workers. >>> >>> my conf: >>> >>> conf.set("spark.cores.max", "128") >>> conf.set("spark.akka.frameSize", "1024") >>> conf.set("spark.executor.memory", "115g") >>> conf.set("spark.shuffle.file.buffer.kb", "1000") >>> >>> my spark-env.sh: >>> ulimit -n 200000 >>> SPARK_JAVA_OPTS="-Xss1g -Xmx129g -d64 -XX:-UseGCOverheadLimit >>> -XX:-UseCompressedOops" >>> SPARK_DRIVER_MEMORY=129G >>> >>> spark version: 1.1.1 >>> >>> Thank you a lot for your help! >>> >>> >>> >>> -- >>> View this message in context: >>> http://apache-spark-user-list.1001560.n3.nabble.com/flatMap-output-on-disk-flatMap-memory-overhead-tp23098p23108.html >>> Sent from the Apache Spark User List mailing list archive at Nabble.com. >>> >>> --------------------------------------------------------------------- >>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>> For additional commands, e-mail: user-h...@spark.apache.org >>> >>> >> >