Will using a large number of reducers have any performance implications on 
certain sized datasets? Ideally I'd like a single number of reducers that will 
be optimal for data sets anywhere between 100 GB to several terabytes in size – 
although I'm not sure how big the groups themselves will be (presumably 
proportional to the size of the dataset).

Thanks for the quick response!

-Matt Cheah

From: Aaron Davidson <[email protected]<mailto:[email protected]>>
Reply-To: 
"[email protected]<mailto:[email protected]>" 
<[email protected]<mailto:[email protected]>>
Date: Monday, December 9, 2013 11:51 AM
To: "[email protected]<mailto:[email protected]>" 
<[email protected]<mailto:[email protected]>>
Subject: Re: groupBy() with really big groups fails

This is very likely due to memory issues. The problem is that each "reducer" 
(partition of the groupBy) builds an in-memory table of that partition. If you 
have very few partitions, this will fail, so the solution is to simply increase 
the number of reducers. For example:
sc.parallelize(1 to 400000000).groupBy(x => x % 10, 256).takeSample(false, 10, 
10)

See also: Tuning 
Spark<https://urldefense.proofpoint.com/v1/url?u=http://spark.incubator.apache.org/docs/latest/tuning.html%23memory-usage-of-reduce-tasks&k=fDZpZZQMmYwf27OU23GmAQ%3D%3D%0A&r=gxvgJndY02bAG2cHbPl1cUTcd%2FLzFGz7wtfiAfRKPpk%3D%0A&m=oeUujyd8P1RY1GtSUaRkacJgu5GQ9W18%2FdMR1PsZIso%3D%0A&s=cf17e72c5743c5b67a081c6a67a963c1ed2aa543e66e7cd922020b95fac3ad4d>
 documentation.


On Mon, Dec 9, 2013 at 11:46 AM, Matt Cheah 
<[email protected]<mailto:[email protected]>> wrote:
Hi everyone,

I was wondering if I could get any insight as to why the following query fails:

scala> sc.parallelize(1 to 400000000).groupBy(x => x % 10).takeSample(false, 
10, 10)
<some generic stuff happens>
2013-12-09 19:42:30,756 [pool-3-thread-1] INFO  
org.apache.spark.network.ConnectionManager - Removing ReceivingConnection to 
ConnectionManagerId(ip-172-31-13-201.us-west-1.compute.internal,50351)
2013-12-09 19:42:30,813 [connection-manager-thread] INFO  
org.apache.spark.network.ConnectionManager - Key not valid ? 
sun.nio.ch.SelectionKeyImpl@f2d755
2013-12-09 19:42:30,756 [pool-3-thread-2] INFO  
org.apache.spark.network.ConnectionManager - Removing ReceivingConnection to 
ConnectionManagerId(ip-172-31-13-199.us-west-1.compute.internal,43961)
2013-12-09 19:42:30,814 [pool-3-thread-4] INFO  
org.apache.spark.network.ConnectionManager - Removing SendingConnection to 
ConnectionManagerId(ip-172-31-13-199.us-west-1.compute.internal,43961)
2013-12-09 19:42:30,814 [pool-3-thread-2] INFO  
org.apache.spark.network.ConnectionManager - Removing SendingConnection to 
ConnectionManagerId(ip-172-31-13-199.us-west-1.compute.internal,43961)
2013-12-09 19:42:30,814 [pool-3-thread-1] INFO  
org.apache.spark.network.ConnectionManager - Removing SendingConnection to 
ConnectionManagerId(ip-172-31-13-201.us-west-1.compute.internal,50351)
2013-12-09 19:42:30,815 [pool-3-thread-3] INFO  
org.apache.spark.network.ConnectionManager - Removing SendingConnection to 
ConnectionManagerId(ip-172-31-13-203.us-west-1.compute.internal,43126)
2013-12-09 19:42:30,818 [connection-manager-thread] INFO  
org.apache.spark.network.ConnectionManager - key already cancelled ? 
sun.nio.ch.SelectionKeyImpl@f2d755
java.nio.channels.CancelledKeyException
at org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:266)
at 
org.apache.spark.network.ConnectionManager$$anon$3.run(ConnectionManager.scala:97)
2013-12-09 19:42:30,825 [pool-3-thread-4] INFO  
org.apache.spark.network.ConnectionManager - Removing ReceivingConnection to 
ConnectionManagerId(ip-172-31-13-203.us-west-1.compute.internal,43126)
<house of cards collapses>

Let me know if more of the logs would be useful, although it seems from this 
point on everything falls apart.

I'm using an EC2-script launched cluster, 10 nodes, m2.4xlarge, 65.9GB of RAM 
per slave. Let me know if any other system configurations are relevant.

In a more general sense – our use case has been involving large group-by 
queries, so I was trying to simulate this kind of workload in the spark shell. 
Is there any good way to consistently get these kinds of queries to work? 
Assume that during the general use-case it can't be known a-priori how many 
groups there will be.

Thanks,

-Matt Cheah

Reply via email to