Hey Jens, If you are trying to do an aggregation such as a count, the best way to do it is using reduceByKey or (in newer versions of Spark) aggregateByKey.
keyVals.reduceByKey(_ + _) The groupBy operator in Spark is not an aggregation operator (e.g. in SQL where you do select sum(salary) group by age...) - there are separate more efficient operators for aggregations. Currently groupBy requires that all of the values for one key can fit in memory. In your case, it's possible you have a single key with a very large number of values, given that your count seems to be failing on a single task. In the latest version of Spark we've added documentation to make this distinction more clear to users: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L390 - Patrick On Tue, Aug 5, 2014 at 6:13 AM, Jens Kristian Geyti <sp...@jkg.dk> wrote: > I'm doing a simple groupBy on a fairly small dataset (80 files in HDFS, > few gigs in total, line based, 500-2000 chars per line). I'm running Spark > on 8 low-memory machines in a yarn cluster, i.e. something along the lines > of: > > spark-submit ... --master yarn-client --num-executors 8 --executor-memory > 3000m --executor-cores 1 > > I'm trying to do a simple groupByKey (see below), but it fails with a > java.lang.OutOfMemoryError: GC overhead limit exceeded exception > > val keyvals = sc.newAPIHadoopFile("hdfs://...") > .map( someobj.produceKeyValTuple ) > > keyvals.groupByKey().count() > > I can count the group sizes using reduceByKey without problems, ensuring > myself the problem isn't caused by a single excessively large group, nor by > an excessive amount of groups : > > keyvals.map(s => (s._1, 1)).reduceByKey((a,b) => > a+b).collect().foreach(println) > // produces: > // (key1,139368) > // (key2,35335) > // (key3,392744) > // ... > // (key13,197941) > > I've tried reformatting, reshuffling and increasing the groupBy level of > parallelism: > > keyvals.groupByKey(24).count // fails > keyvals.groupByKey(3000).count // fails > keyvals.coalesce(24, true).groupByKey(24).count // fails > keyvals.coalesce(3000, true).groupByKey(3000).count // fails > > I've tried playing around with spark.default.parallelism, and increasing > spark.shuffle.memoryFraction to 0.8 while lowering > spark.storage.memoryFraction to 0.1 > > The failing stage (count) will fail on task 2999 of 3000. > > I can't seem to find anything that suggests that groupBy shouldn't just > spill to disk instead of keeping things in memory, but I just can't get it > to work right, even on fairly small datasets. This should obviosuly not be > the case, and I must be doing something wrong, but I have no idea where to > start debugging this, or even trying to understand what's going on - for > the same reason, I'm not looking for a solution to my specific problem, as > much as I'm looking for insight into how to reliably group datasets in > Spark. > > Notice that I've also posted this question to SO, before realising this > mailing list is more active. I will update the SO thread, if I receive an > answer here. > > ------------------------------ > View this message in context: Understanding RDD.GroupBy OutOfMemory > Exceptions > <http://apache-spark-user-list.1001560.n3.nabble.com/Understanding-RDD-GroupBy-OutOfMemory-Exceptions-tp11427.html> > Sent from the Apache Spark User List mailing list archive > <http://apache-spark-user-list.1001560.n3.nabble.com/> at Nabble.com. >