Hey Jens,

If you are trying to do an aggregation such as a count, the best way to do
it is using reduceByKey or (in newer versions of Spark) aggregateByKey.

keyVals.reduceByKey(_ + _)

The groupBy operator in Spark is not an aggregation operator (e.g. in SQL
where you do select sum(salary) group by age...) - there are separate more
efficient operators for aggregations. Currently groupBy requires that all
of the values for one key can fit in memory. In your case, it's possible
you have a single key with a very large number of values, given that your
count seems to be failing on a single task.

In the latest version of Spark we've added documentation to make this
distinction more clear to users:
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L390

- Patrick


On Tue, Aug 5, 2014 at 6:13 AM, Jens Kristian Geyti <sp...@jkg.dk> wrote:

> I'm doing a simple groupBy on a fairly small dataset (80 files in HDFS,
> few gigs in total, line based, 500-2000 chars per line). I'm running Spark
> on 8 low-memory machines in a yarn cluster, i.e. something along the lines
> of:
>
>     spark-submit ... --master yarn-client --num-executors 8 --executor-memory 
> 3000m --executor-cores 1
>
> I'm trying to do a simple groupByKey (see below), but it fails with a
> java.lang.OutOfMemoryError: GC overhead limit exceeded exception
>
>     val keyvals = sc.newAPIHadoopFile("hdfs://...")
>                     .map( someobj.produceKeyValTuple )
>
>     keyvals.groupByKey().count()
>
> I can count the group sizes using reduceByKey without problems, ensuring
> myself the problem isn't caused by a single excessively large group, nor by
> an excessive amount of groups :
>
>   keyvals.map(s => (s._1, 1)).reduceByKey((a,b) => 
> a+b).collect().foreach(println)
>   // produces:
>   //  (key1,139368)
>   //  (key2,35335)
>   //  (key3,392744)
>   //  ...
>   //  (key13,197941)
>
> I've tried reformatting, reshuffling and increasing the groupBy level of
> parallelism:
>
>   keyvals.groupByKey(24).count // fails
>   keyvals.groupByKey(3000).count // fails
>   keyvals.coalesce(24, true).groupByKey(24).count // fails
>   keyvals.coalesce(3000, true).groupByKey(3000).count // fails
>
> I've tried playing around with spark.default.parallelism, and increasing
> spark.shuffle.memoryFraction to 0.8 while lowering
> spark.storage.memoryFraction to 0.1
>
> The failing stage (count) will fail on task 2999 of 3000.
>
> I can't seem to find anything that suggests that groupBy shouldn't just
> spill to disk instead of keeping things in memory, but I just can't get it
> to work right, even on fairly small datasets. This should obviosuly not be
> the case, and I must be doing something wrong, but I have no idea where to
> start debugging this, or even trying to understand what's going on - for
> the same reason, I'm not looking for a solution to my specific problem, as
> much as I'm looking for insight into how to reliably group datasets in
> Spark.
>
> Notice that I've also posted this question to SO, before realising this
> mailing list is more active. I will update the SO thread, if I receive an
> answer here.
>
> ------------------------------
> View this message in context: Understanding RDD.GroupBy OutOfMemory
> Exceptions
> <http://apache-spark-user-list.1001560.n3.nabble.com/Understanding-RDD-GroupBy-OutOfMemory-Exceptions-tp11427.html>
> Sent from the Apache Spark User List mailing list archive
> <http://apache-spark-user-list.1001560.n3.nabble.com/> at Nabble.com.
>

Reply via email to