Hey Stephen,

The issue is this. When you do a shuffle form N map partitions to M reduce
partitions, there are N * M output blocks created and each one is tracked.
That's why all these per-block overheads are causing you to OOM.

One way to fix this is to reduce the per-block overhead which Josh and
Aaron are discussing. It actually used to be even worse, we used to also
create N * M files, but Aaron recently submitted a patch to decrease the
number of files.

Another way to fix it is to modify your job to create fewer partitions. Try
coalesce'ing the input into fewer partitions before you shuffle. For
instance, call coalesce(100) on the input before (or play with different
values here). You should also try explicitly setting fewer partitions on
the reduce side as well when you do the cogroup. Since you only have a few
machines, you don't "need" the extra partitions to add more parallelism to
the reduce. So by playing with the map and/or reduce partitions you can
probably fix this.

At least, I'm interested in hearing how that works!

In general, people usually don't hit this, because it's not so common to
have thousands of reducers *per machine* (i.e. most machines only have a
few cores anyway).

- Patrick




On Sat, Oct 26, 2013 at 1:54 PM, Aaron Davidson <[email protected]> wrote:

> Thanks again for the great detail! Your setup sounds correct, and all the
> numbers you're seeing suggest that no foul play (i.e., bug) is at work. I
> think this is a simple case of the blockInfo and blockToFileSegmentMap being
> poorly optimized, especially the latter. I will look into reducing the
> memory footprint of the blockToFileSegmentMap using similar techniques as
> Josh mentioned for the blockInfo map. I've created a jira 
> (SPARK-946<https://spark-project.atlassian.net/browse/SPARK-946>)
> to track this issue.
>
> In the meantime, 4GB of memory does seem really low, so more or
> higher-memory machines is probably the only route to get your job running
> successfully right now.
>
>
> On Sat, Oct 26, 2013 at 1:13 PM, Stephen Haberman <
> [email protected]> wrote:
>
>> Hi Patrick,
>>
>> > Just wondering, how many reducers are you using in this shuffle? By
>> > 7,000 partitions, I'm assuming you mean the map side of the shuffle.
>> > What about the reduce side?
>>
>> 7,000 on that side as well.
>>
>> We're loading about a month's worth of data in one RDD, with ~7,000
>> partitions, and cogrouping it with another RDD with 50 partitions, and
>> the resulting RDD also has 7,000 partitions.
>>
>> (As, since we don't have spark.default.parallelism set, the
>> defaultPartitioner logic chooses the max of [50, 7,0000] to be the next
>> partition size.)
>>
>> I believe that is what you're asking by number of reducers? The number
>> of partitions in the post-cogroup ShuffledRDD?
>>
>> Also, AFAICT, I don't believe we get to the reduce/ShuffledRDD side of
>> this cogroup--after 2,000-3,000 ShuffleMapTasks on the map side is when
>> it bogs down.
>>
>> Thanks,
>> Stephen
>>
>>
>

Reply via email to