As for the second part of your questions- we have a fairly complex join
process which requires a ton of stage orchestration from our driver. I've
written some code to be able to walk down our DAG tree and execute siblings
in the tree concurrently where possible (forcing cache to disk on children
that that have multiple chiildren themselves so that they can be run
concurrently). Ultimatey, we have seen significant speedup in our jobs by
keeping tasks as busy as possible processing concurrent stages. Funny
enough though, the stage that is causing problems with shuffling for us has
a lot of children and doesn't even run concurrently with any other stages
so I ruled out the concurrency of the stages as a culprit for the
shuffliing problem we're seeing.

On Sun, Feb 7, 2016 at 7:49 AM, Corey Nolet <cjno...@gmail.com> wrote:

> Igor,
>
> I don't think the question is "why can't it fit stuff in memory". I know
> why it can't fit stuff in memory- because it's a large dataset that needs
> to have a reduceByKey() run on it. My understanding is that when it doesn't
> fit into memory it needs to spill in order to consolidate intermediary
> files into a single file. The more data you need to run through this, the
> more it will need to spill. My findings is that once it gets stuck in this
> spill chain with our dataset it's all over @ that point because it will
> spill and spill and spill and spill and spill. If I give the shuffle enough
> memory it won't- irrespective of the number of partitions we have (i've
> done everything from repartition(500) to repartition(2500)). It's not a
> matter of running out of memory on a single node because the data is
> skewed. It's more a matter of the shuffle buffer filling up and needing to
> spill. I think what may be happening is that it gets to a point where it's
> spending more time reading/writing from disk while doing the spills then it
> is actually processing any data. I can tell this because I can see that the
> spills sometimes get up into the 10's to 100's of TB where the input data
> was maybe 100gb at most. Unfortunately my code is on a private internal
> network and I'm not able to share it.
>
> On Sun, Feb 7, 2016 at 3:38 AM, Igor Berman <igor.ber...@gmail.com> wrote:
>
>> so can you provide code snippets: especially it's interesting to see what
>> are your transformation chain, how many partitions are there on each side
>> of shuffle operation
>>
>> the question is why it can't fit stuff in memory when you are shuffling -
>> maybe your partitioner on "reduce" side is not configured properly? I mean
>> if map side is ok, and you just reducing by key or something it should be
>> ok, so some detail is missing...skewed data? aggregate by key?
>>
>> On 6 February 2016 at 20:13, Corey Nolet <cjno...@gmail.com> wrote:
>>
>>> Igor,
>>>
>>> Thank you for the response but unfortunately, the problem I'm referring
>>> to goes beyond this. I have set the shuffle memory fraction to be 90% and
>>> set the cache memory to be 0. Repartitioning the RDD helped a tad on the
>>> map side but didn't do much for the spilling when there was no longer any
>>> memory left for the shuffle. Also the new auto-memory management doesn't
>>> seem like it'll have too much of an effect after i've already given most
>>> the memory i've allocated to the shuffle. The problem I'm having is most
>>> specifically related to the shuffle performing declining by several orders
>>> of magnitude when it needs to spill multiple times (it ends up spilling
>>> several hundred for me when it can't fit stuff into memory).
>>>
>>>
>>>
>>> On Sat, Feb 6, 2016 at 6:40 AM, Igor Berman <igor.ber...@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>> usually you can solve this by 2 steps
>>>> make rdd to have more partitions
>>>> play with shuffle memory fraction
>>>>
>>>> in spark 1.6 cache vs shuffle memory fractions are adjusted
>>>> automatically
>>>>
>>>> On 5 February 2016 at 23:07, Corey Nolet <cjno...@gmail.com> wrote:
>>>>
>>>>> I just recently had a discovery that my jobs were taking several hours
>>>>> to completely because of excess shuffle spills. What I found was that when
>>>>> I hit the high point where I didn't have enough memory for the shuffles to
>>>>> store all of their file consolidations at once, it could spill so many
>>>>> times that it causes my job's runtime to increase by orders of magnitude
>>>>> (and sometimes fail altogether).
>>>>>
>>>>> I've played with all the tuning parameters I can find. To speed the
>>>>> shuffles up, I tuned the akka threads to different values. I also tuned 
>>>>> the
>>>>> shuffle buffering a tad (both up and down).
>>>>>
>>>>> I feel like I see a weak point here. The mappers are sharing memory
>>>>> space with reducers and the shuffles need enough memory to consolidate and
>>>>> pull otherwise they will need to spill and spill and spill. What i've
>>>>> noticed about my jobs is that this is a difference between them taking 30
>>>>> minutes and 4 hours or more. Same job- just different memory tuning.
>>>>>
>>>>> I've found that, as a result of the spilling, I'm better off not
>>>>> caching any data in memory and lowering my storage fraction to 0 and still
>>>>> hoping I was able to give my shuffles enough memory that my data doesn't
>>>>> continuously spill. Is this the way it's supposed to be? It makes it hard
>>>>> because it seems like it forces the memory limits on my job- otherwise it
>>>>> could take orders of magnitude longer to execute.
>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to