GitHub user tgravescs opened a pull request:

    https://github.com/apache/spark/pull/12113

    [SPARK-1239] Improve fetching of map output statuses

    The main issue we are trying to solve is the memory bloat of the Driver 
when tasks request the map output statuses.  This means with a large number of 
tasks you either need a huge amount of memory on Driver or you have to 
repartition to smaller number.  This makes it really difficult to run over say 
50000 tasks.
    
    The main issues that cause the memory bloat are:
    1) no flow control on sending the map output status responses.  We 
serialize the map status output  and then hand off to netty to send.  netty is 
sending asynchronously and it can't send them fast enough to keep up with 
incoming requests so we end up with lots of copies of the serialized map output 
statuses sitting there and this causes huge bloat when you have 10's of 
thousands of tasks and map output status is in the 10's of MB.
    2) When initial reduce tasks are started up, they all request the map 
output statuses from the Driver. These requests are handled by multiple threads 
in parallel so even though we check to see if we have a cached version, 
initially when we don't have a cached version yet, many of initial requests can 
all end up serializing the exact same map output statuses.
    
    This patch does a couple of things:
    - When the map output status size is over a threshold (default 512K) then 
it uses broadcast to send the map statuses.  This means we no longer serialize 
a large map output status and thus we don't have issues with memory bloat.  the 
messages sizes are now in the 300-400 byte range and the map status output are 
broadcast. If its under the threadshold it sends it as before, the message 
contains the DIRECT indicator now.
    - synchronize the incoming requests to allow one thread to cache the 
serialized output and broadcast the map output status  that can then be used by 
everyone else.  This ensures we don't create multiple broadcast variables when 
we don't need to.  To ensure this happens I added a second thread pool which 
the Dispatcher hands the requests to so that those threads can block without 
blocking the main dispatcher threads (which would cause things like heartbeats 
and such not to come through)
    
    Note that some of design and code was contributed by @mridulm
    
    ## How was this patch tested?
    
    Unit tests and a lot of manually testing. 
    Ran with akka and netty rpc. Ran with both dynamic allocation on and off.
    
    one of the large jobs I used to test this was a join of 15TB of data.  it 
had 200,000 map tasks, and  20,000 reduce tasks. Executors ranged from 200 to 
2000.  This job ran successfully with 5GB of memory on the driver with these 
changes. Without these changes I was using 20GB and only had 500 reduce tasks.  
The job has 50mb of serialized map output statuses and took roughly the same 
amount of time for the executors to get the map output statuses as before.
    
    Ran a variety of other jobs, from large wordcounts to small ones not using 
broadcasts. 


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tgravescs/spark SPARK-1239

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/12113.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #12113
    
----
commit 8e4f2efea08b7013a2702543dc3860f9d277e3ac
Author: Thomas Graves <[email protected]>
Date:   2016-04-01T17:44:44Z

    [SPARK-1239] Don't fetch all map output statuses at each reducer during 
shuffles

commit 3c1def02b80ebfed55904e504609aa02de6559ce
Author: Thomas Graves <[email protected]>
Date:   2016-04-01T18:49:49Z

    Update unit test

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to