This is very likely due to memory issues. The problem is that each "reducer" (partition of the groupBy) builds an in-memory table of that partition. If you have very few partitions, this will fail, so the solution is to simply increase the number of reducers. For example: sc.parallelize(1 to 400000000).groupBy(x => x % 10, *256*).takeSample(false, 10, 10)
See also: Tuning Spark<http://spark.incubator.apache.org/docs/latest/tuning.html#memory-usage-of-reduce-tasks>documentation. On Mon, Dec 9, 2013 at 11:46 AM, Matt Cheah <mch...@palantir.com> wrote: > Hi everyone, > > I was wondering if I could get any insight as to why the following query > fails: > > scala> sc.parallelize(1 to 400000000).groupBy(x => x % > 10).takeSample(false, 10, 10) > <some generic stuff happens> > 2013-12-09 19:42:30,756 [pool-3-thread-1] INFO > org.apache.spark.network.ConnectionManager - Removing ReceivingConnection > to ConnectionManagerId(ip-172-31-13-201.us-west-1.compute.internal,50351) > 2013-12-09 19:42:30,813 [connection-manager-thread] INFO > org.apache.spark.network.ConnectionManager - Key not valid ? > sun.nio.ch.SelectionKeyImpl@f2d755 > 2013-12-09 19:42:30,756 [pool-3-thread-2] INFO > org.apache.spark.network.ConnectionManager - Removing ReceivingConnection > to ConnectionManagerId(ip-172-31-13-199.us-west-1.compute.internal,43961) > 2013-12-09 19:42:30,814 [pool-3-thread-4] INFO > org.apache.spark.network.ConnectionManager - Removing SendingConnection to > ConnectionManagerId(ip-172-31-13-199.us-west-1.compute.internal,43961) > 2013-12-09 19:42:30,814 [pool-3-thread-2] INFO > org.apache.spark.network.ConnectionManager - Removing SendingConnection to > ConnectionManagerId(ip-172-31-13-199.us-west-1.compute.internal,43961) > 2013-12-09 19:42:30,814 [pool-3-thread-1] INFO > org.apache.spark.network.ConnectionManager - Removing SendingConnection to > ConnectionManagerId(ip-172-31-13-201.us-west-1.compute.internal,50351) > 2013-12-09 19:42:30,815 [pool-3-thread-3] INFO > org.apache.spark.network.ConnectionManager - Removing SendingConnection to > ConnectionManagerId(ip-172-31-13-203.us-west-1.compute.internal,43126) > 2013-12-09 19:42:30,818 [connection-manager-thread] INFO > org.apache.spark.network.ConnectionManager - key already cancelled ? > sun.nio.ch.SelectionKeyImpl@f2d755 > java.nio.channels.CancelledKeyException > at > org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:266) > at > org.apache.spark.network.ConnectionManager$$anon$3.run(ConnectionManager.scala:97) > 2013-12-09 19:42:30,825 [pool-3-thread-4] INFO > org.apache.spark.network.ConnectionManager - Removing ReceivingConnection > to ConnectionManagerId(ip-172-31-13-203.us-west-1.compute.internal,43126) > <house of cards collapses> > > Let me know if more of the logs would be useful, although it seems from > this point on everything falls apart. > > I'm using an EC2-script launched cluster, 10 nodes, m2.4xlarge, 65.9GB > of RAM per slave. Let me know if any other system configurations are > relevant. > > In a more general sense – our use case has been involving large group-by > queries, so I was trying to simulate this kind of workload in the spark > shell. Is there any good way to consistently get these kinds of queries to > work? Assume that during the general use-case it can't be known a-priori > how many groups there will be. > > Thanks, > > -Matt Cheah >