Thanks a lot for that. There's definitely a lot of subtleties that we need to 
consider. We appreciate the thorough explanation!

-Matt Cheah

From: Aaron Davidson <ilike...@gmail.com<mailto:ilike...@gmail.com>>
Reply-To: 
"user@spark.incubator.apache.org<mailto:user@spark.incubator.apache.org>" 
<user@spark.incubator.apache.org<mailto:user@spark.incubator.apache.org>>
Date: Monday, December 9, 2013 12:28 PM
To: "user@spark.incubator.apache.org<mailto:user@spark.incubator.apache.org>" 
<user@spark.incubator.apache.org<mailto:user@spark.incubator.apache.org>>
Cc: Mingyu Kim <m...@palantir.com<mailto:m...@palantir.com>>
Subject: Re: groupBy() with really big groups fails

The issue is unfortunately a little complicated. The simple answer that I'd 
like to give is that increasing the number of reducers just increases the 
number of tasks launched, and Spark is good at handling a relatively large 
number of tasks due to its low scheduling latency.

Regarding scaling, you have two solutions. One, the default number of reducers 
is the same as the number of mappers of the parent RDD, so something like
sc.parallelize(1 to 400000000, 256).groupBy(x => x % 10).takeSample(false, 10, 
10)
would also produce 256 reducers. Since the number of input partitions usually 
scales naturally with the size of the dataset (e.g., HDFS blocks), this is 
often a workable solution. The second option, of course, is to try to calculate 
the biggest expected partition size based on your input data, and thus manually 
compute a good number of reducers based on input data size. There is no 
hard-and-fast "good" number of reducers as it is extremely workload dependent.

Now for the caveats. A large number of reducers can actually be just as much an 
issue as a small number of reducers. If you have N map partitions and R 
reducers, we create N*R files on disk across the cluster in order to do the 
group by. Unfortunately, file systems tend to become inefficient at handling 
very large numbers of files (in the millions). In order to fix this, we have 
introduced a feature called "shuffle file consolidation" in 0.8.1 and beyond, 
which produces only C*R files (for C CPUs) rather than N*R. [Due to an issue 
with the ext3 file system and many-cored systems, however, this feature is 
turned off by default and must be explicitly enabled as 
"-Dspark.shuffle.consolidateFiles=true".]

The second caveat is that we maintain C*R open files during the map phase. Each 
open file requires its own compression buffer (~30KB for Snappy and up to 
~200KB for LZF) and output stream buffer (100KB by default). With 8 cores and 
10k reducers, this means we use 10-30GB by default on every machine just for 
buffering the outputs. You can reduce this by (A) using snappy or no 
compression (LZF is default) and (B) reducing the output stream buffer size via 
the "spark.shuffle.file.buffer.kb" property.

For some prior discussion on this, refer to this thread: 
http://mail-archives.apache.org/mod_mbox/spark-user/201311.mbox/%3C20131121150056.51d81f9d@sh9%3E

Towards the future, we are looking into alleviating these troubles by using 
out-of-core reducing functions (so too few reducers means a slower job rather 
than a failed one) and using a constant number of files (or at least one that 
scales down as new machines are added) in order to make too many reducers a 
non-issue.


On Mon, Dec 9, 2013 at 11:55 AM, Matt Cheah 
<mch...@palantir.com<mailto:mch...@palantir.com>> wrote:
Will using a large number of reducers have any performance implications on 
certain sized datasets? Ideally I'd like a single number of reducers that will 
be optimal for data sets anywhere between 100 GB to several terabytes in size – 
although I'm not sure how big the groups themselves will be (presumably 
proportional to the size of the dataset).

Thanks for the quick response!

-Matt Cheah

From: Aaron Davidson <ilike...@gmail.com<mailto:ilike...@gmail.com>>
Reply-To: 
"user@spark.incubator.apache.org<mailto:user@spark.incubator.apache.org>" 
<user@spark.incubator.apache.org<mailto:user@spark.incubator.apache.org>>
Date: Monday, December 9, 2013 11:51 AM
To: "user@spark.incubator.apache.org<mailto:user@spark.incubator.apache.org>" 
<user@spark.incubator.apache.org<mailto:user@spark.incubator.apache.org>>
Subject: Re: groupBy() with really big groups fails

This is very likely due to memory issues. The problem is that each "reducer" 
(partition of the groupBy) builds an in-memory table of that partition. If you 
have very few partitions, this will fail, so the solution is to simply increase 
the number of reducers. For example:
sc.parallelize(1 to 400000000).groupBy(x => x % 10, 256).takeSample(false, 10, 
10)

See also: Tuning 
Spark<https://urldefense.proofpoint.com/v1/url?u=http://spark.incubator.apache.org/docs/latest/tuning.html%23memory-usage-of-reduce-tasks&k=fDZpZZQMmYwf27OU23GmAQ%3D%3D%0A&r=gxvgJndY02bAG2cHbPl1cUTcd%2FLzFGz7wtfiAfRKPpk%3D%0A&m=oeUujyd8P1RY1GtSUaRkacJgu5GQ9W18%2FdMR1PsZIso%3D%0A&s=cf17e72c5743c5b67a081c6a67a963c1ed2aa543e66e7cd922020b95fac3ad4d>
 documentation.


On Mon, Dec 9, 2013 at 11:46 AM, Matt Cheah 
<mch...@palantir.com<mailto:mch...@palantir.com>> wrote:
Hi everyone,

I was wondering if I could get any insight as to why the following query fails:

scala> sc.parallelize(1 to 400000000).groupBy(x => x % 10).takeSample(false, 
10, 10)
<some generic stuff happens>
2013-12-09 19:42:30,756 [pool-3-thread-1] INFO  
org.apache.spark.network.ConnectionManager - Removing ReceivingConnection to 
ConnectionManagerId(ip-172-31-13-201.us-west-1.compute.internal,50351)
2013-12-09 19:42:30,813 [connection-manager-thread] INFO  
org.apache.spark.network.ConnectionManager - Key not valid ? 
sun.nio.ch.SelectionKeyImpl@f2d755
2013-12-09 19:42:30,756 [pool-3-thread-2] INFO  
org.apache.spark.network.ConnectionManager - Removing ReceivingConnection to 
ConnectionManagerId(ip-172-31-13-199.us-west-1.compute.internal,43961)
2013-12-09 19:42:30,814 [pool-3-thread-4] INFO  
org.apache.spark.network.ConnectionManager - Removing SendingConnection to 
ConnectionManagerId(ip-172-31-13-199.us-west-1.compute.internal,43961)
2013-12-09 19:42:30,814 [pool-3-thread-2] INFO  
org.apache.spark.network.ConnectionManager - Removing SendingConnection to 
ConnectionManagerId(ip-172-31-13-199.us-west-1.compute.internal,43961)
2013-12-09 19:42:30,814 [pool-3-thread-1] INFO  
org.apache.spark.network.ConnectionManager - Removing SendingConnection to 
ConnectionManagerId(ip-172-31-13-201.us-west-1.compute.internal,50351)
2013-12-09 19:42:30,815 [pool-3-thread-3] INFO  
org.apache.spark.network.ConnectionManager - Removing SendingConnection to 
ConnectionManagerId(ip-172-31-13-203.us-west-1.compute.internal,43126)
2013-12-09 19:42:30,818 [connection-manager-thread] INFO  
org.apache.spark.network.ConnectionManager - key already cancelled ? 
sun.nio.ch.SelectionKeyImpl@f2d755
java.nio.channels.CancelledKeyException
at org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:266)
at 
org.apache.spark.network.ConnectionManager$$anon$3.run(ConnectionManager.scala:97)
2013-12-09 19:42:30,825 [pool-3-thread-4] INFO  
org.apache.spark.network.ConnectionManager - Removing ReceivingConnection to 
ConnectionManagerId(ip-172-31-13-203.us-west-1.compute.internal,43126)
<house of cards collapses>

Let me know if more of the logs would be useful, although it seems from this 
point on everything falls apart.

I'm using an EC2-script launched cluster, 10 nodes, m2.4xlarge, 65.9GB of RAM 
per slave. Let me know if any other system configurations are relevant.

In a more general sense – our use case has been involving large group-by 
queries, so I was trying to simulate this kind of workload in the spark shell. 
Is there any good way to consistently get these kinds of queries to work? 
Assume that during the general use-case it can't be known a-priori how many 
groups there will be.

Thanks,

-Matt Cheah


Reply via email to