The issue is unfortunately a little complicated. The simple answer that I'd
like to give is that increasing the number of reducers just increases the
number of tasks launched, and Spark is good at handling a relatively large
number of tasks due to its low scheduling latency.

Regarding scaling, you have two solutions. One, the default number of
reducers is the same as the number of mappers of the parent RDD, so
something like
sc.parallelize(1 to 400000000, *256*).groupBy(x => x %
10).takeSample(false, 10, 10)
would also produce 256 reducers. Since the number of input partitions
usually scales naturally with the size of the dataset (e.g., HDFS blocks),
this is often a workable solution. The second option, of course, is to try
to calculate the biggest expected partition size based on your input data,
and thus manually compute a good number of reducers based on input data
size. There is no hard-and-fast "good" number of reducers as it is
extremely workload dependent.

Now for the caveats. A large number of reducers can actually be just as
much an issue as a small number of reducers. If you have N map partitions
and R reducers, we create N*R files on disk across the cluster in order to
do the group by. Unfortunately, file systems tend to become inefficient at
handling very large numbers of files (in the millions). In order to fix
this, we have introduced a feature called "shuffle file consolidation" in
0.8.1 and beyond, which produces only C*R files (for C CPUs) rather than
N*R. [Due to an issue with the ext3 file system and many-cored systems,
however, this feature is turned off by default and must be explicitly
enabled as "-Dspark.shuffle.consolidateFiles=true".]

The second caveat is that we maintain C*R *open* files during the map
phase. Each open file requires its own compression buffer (~30KB for Snappy
and up to ~200KB for LZF) and output stream buffer (100KB by default). With
8 cores and 10k reducers, this means we use 10-30GB by default on every
machine just for buffering the outputs. You can reduce this by (A) using
snappy or no compression (LZF is default) and (B) reducing the output
stream buffer size via the "spark.shuffle.file.buffer.kb" property.

For some prior discussion on this, refer to this thread:
http://mail-archives.apache.org/mod_mbox/spark-user/201311.mbox/%3C20131121150056.51d81f9d@sh9%3E

Towards the future, we are looking into alleviating these troubles by using
out-of-core reducing functions (so too few reducers means a slower job
rather than a failed one) and using a constant number of files (or at least
one that scales down as new machines are added) in order to make too many
reducers a non-issue.


On Mon, Dec 9, 2013 at 11:55 AM, Matt Cheah <[email protected]> wrote:

>  Will using a large number of reducers have any performance implications
> on certain sized datasets? Ideally I'd like a single number of reducers
> that will be optimal for data sets anywhere between 100 GB to several
> terabytes in size – although I'm not sure how big the groups themselves
> will be (presumably proportional to the size of the dataset).
>
>  Thanks for the quick response!
>
>  -Matt Cheah
>
>   From: Aaron Davidson <[email protected]>
> Reply-To: "[email protected]" <
> [email protected]>
> Date: Monday, December 9, 2013 11:51 AM
> To: "[email protected]" <[email protected]>
> Subject: Re: groupBy() with really big groups fails
>
>   This is very likely due to memory issues. The problem is that each
> "reducer" (partition of the groupBy) builds an in-memory table of that
> partition. If you have very few partitions, this will fail, so the solution
> is to simply increase the number of reducers. For example:
> sc.parallelize(1 to 400000000).groupBy(x => x % 10, *256*).takeSample(false,
> 10, 10)
>
>  See also: Tuning 
> Spark<https://urldefense.proofpoint.com/v1/url?u=http://spark.incubator.apache.org/docs/latest/tuning.html%23memory-usage-of-reduce-tasks&k=fDZpZZQMmYwf27OU23GmAQ%3D%3D%0A&r=gxvgJndY02bAG2cHbPl1cUTcd%2FLzFGz7wtfiAfRKPpk%3D%0A&m=oeUujyd8P1RY1GtSUaRkacJgu5GQ9W18%2FdMR1PsZIso%3D%0A&s=cf17e72c5743c5b67a081c6a67a963c1ed2aa543e66e7cd922020b95fac3ad4d>documentation.
>
>
> On Mon, Dec 9, 2013 at 11:46 AM, Matt Cheah <[email protected]> wrote:
>
>>  Hi everyone,
>>
>>  I was wondering if I could get any insight as to why the following
>> query fails:
>>
>>  scala> sc.parallelize(1 to 400000000).groupBy(x => x %
>> 10).takeSample(false, 10, 10)
>> <some generic stuff happens>
>>  2013-12-09 19:42:30,756 [pool-3-thread-1] INFO
>>  org.apache.spark.network.ConnectionManager - Removing ReceivingConnection
>> to ConnectionManagerId(ip-172-31-13-201.us-west-1.compute.internal,50351)
>> 2013-12-09 19:42:30,813 [connection-manager-thread] INFO
>>  org.apache.spark.network.ConnectionManager - Key not valid ?
>> sun.nio.ch.SelectionKeyImpl@f2d755
>> 2013-12-09 19:42:30,756 [pool-3-thread-2] INFO
>>  org.apache.spark.network.ConnectionManager - Removing ReceivingConnection
>> to ConnectionManagerId(ip-172-31-13-199.us-west-1.compute.internal,43961)
>> 2013-12-09 19:42:30,814 [pool-3-thread-4] INFO
>>  org.apache.spark.network.ConnectionManager - Removing SendingConnection to
>> ConnectionManagerId(ip-172-31-13-199.us-west-1.compute.internal,43961)
>> 2013-12-09 19:42:30,814 [pool-3-thread-2] INFO
>>  org.apache.spark.network.ConnectionManager - Removing SendingConnection to
>> ConnectionManagerId(ip-172-31-13-199.us-west-1.compute.internal,43961)
>> 2013-12-09 19:42:30,814 [pool-3-thread-1] INFO
>>  org.apache.spark.network.ConnectionManager - Removing SendingConnection to
>> ConnectionManagerId(ip-172-31-13-201.us-west-1.compute.internal,50351)
>> 2013-12-09 19:42:30,815 [pool-3-thread-3] INFO
>>  org.apache.spark.network.ConnectionManager - Removing SendingConnection to
>> ConnectionManagerId(ip-172-31-13-203.us-west-1.compute.internal,43126)
>> 2013-12-09 19:42:30,818 [connection-manager-thread] INFO
>>  org.apache.spark.network.ConnectionManager - key already cancelled ?
>> sun.nio.ch.SelectionKeyImpl@f2d755
>> java.nio.channels.CancelledKeyException
>> at
>> org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:266)
>> at
>> org.apache.spark.network.ConnectionManager$$anon$3.run(ConnectionManager.scala:97)
>> 2013-12-09 19:42:30,825 [pool-3-thread-4] INFO
>>  org.apache.spark.network.ConnectionManager - Removing ReceivingConnection
>> to ConnectionManagerId(ip-172-31-13-203.us-west-1.compute.internal,43126)
>> <house of cards collapses>
>>
>>  Let me know if more of the logs would be useful, although it seems from
>> this point on everything falls apart.
>>
>>  I'm using an EC2-script launched cluster, 10 nodes, m2.4xlarge, 65.9GB
>> of RAM per slave. Let me know if any other system configurations are
>> relevant.
>>
>>  In a more general sense – our use case has been involving large
>> group-by queries, so I was trying to simulate this kind of workload in the
>> spark shell. Is there any good way to consistently get these kinds of
>> queries to work? Assume that during the general use-case it can't be known
>> a-priori how many groups there will be.
>>
>>  Thanks,
>>
>>  -Matt Cheah
>>
>
>

Reply via email to