Hey Stephen, The issue is this. When you do a shuffle form N map partitions to M reduce partitions, there are N * M output blocks created and each one is tracked. That's why all these per-block overheads are causing you to OOM.
One way to fix this is to reduce the per-block overhead which Josh and Aaron are discussing. It actually used to be even worse, we used to also create N * M files, but Aaron recently submitted a patch to decrease the number of files. Another way to fix it is to modify your job to create fewer partitions. Try coalesce'ing the input into fewer partitions before you shuffle. For instance, call coalesce(100) on the input before (or play with different values here). You should also try explicitly setting fewer partitions on the reduce side as well when you do the cogroup. Since you only have a few machines, you don't "need" the extra partitions to add more parallelism to the reduce. So by playing with the map and/or reduce partitions you can probably fix this. At least, I'm interested in hearing how that works! In general, people usually don't hit this, because it's not so common to have thousands of reducers *per machine* (i.e. most machines only have a few cores anyway). - Patrick On Sat, Oct 26, 2013 at 1:54 PM, Aaron Davidson <[email protected]> wrote: > Thanks again for the great detail! Your setup sounds correct, and all the > numbers you're seeing suggest that no foul play (i.e., bug) is at work. I > think this is a simple case of the blockInfo and blockToFileSegmentMap being > poorly optimized, especially the latter. I will look into reducing the > memory footprint of the blockToFileSegmentMap using similar techniques as > Josh mentioned for the blockInfo map. I've created a jira > (SPARK-946<https://spark-project.atlassian.net/browse/SPARK-946>) > to track this issue. > > In the meantime, 4GB of memory does seem really low, so more or > higher-memory machines is probably the only route to get your job running > successfully right now. > > > On Sat, Oct 26, 2013 at 1:13 PM, Stephen Haberman < > [email protected]> wrote: > >> Hi Patrick, >> >> > Just wondering, how many reducers are you using in this shuffle? By >> > 7,000 partitions, I'm assuming you mean the map side of the shuffle. >> > What about the reduce side? >> >> 7,000 on that side as well. >> >> We're loading about a month's worth of data in one RDD, with ~7,000 >> partitions, and cogrouping it with another RDD with 50 partitions, and >> the resulting RDD also has 7,000 partitions. >> >> (As, since we don't have spark.default.parallelism set, the >> defaultPartitioner logic chooses the max of [50, 7,0000] to be the next >> partition size.) >> >> I believe that is what you're asking by number of reducers? The number >> of partitions in the post-cogroup ShuffledRDD? >> >> Also, AFAICT, I don't believe we get to the reduce/ShuffledRDD side of >> this cogroup--after 2,000-3,000 ShuffleMapTasks on the map side is when >> it bogs down. >> >> Thanks, >> Stephen >> >> >
