GitHub user tgravescs opened a pull request:
https://github.com/apache/spark/pull/12113
[SPARK-1239] Improve fetching of map output statuses
The main issue we are trying to solve is the memory bloat of the Driver
when tasks request the map output statuses. This means with a large number of
tasks you either need a huge amount of memory on Driver or you have to
repartition to smaller number. This makes it really difficult to run over say
50000 tasks.
The main issues that cause the memory bloat are:
1) no flow control on sending the map output status responses. We
serialize the map status output and then hand off to netty to send. netty is
sending asynchronously and it can't send them fast enough to keep up with
incoming requests so we end up with lots of copies of the serialized map output
statuses sitting there and this causes huge bloat when you have 10's of
thousands of tasks and map output status is in the 10's of MB.
2) When initial reduce tasks are started up, they all request the map
output statuses from the Driver. These requests are handled by multiple threads
in parallel so even though we check to see if we have a cached version,
initially when we don't have a cached version yet, many of initial requests can
all end up serializing the exact same map output statuses.
This patch does a couple of things:
- When the map output status size is over a threshold (default 512K) then
it uses broadcast to send the map statuses. This means we no longer serialize
a large map output status and thus we don't have issues with memory bloat. the
messages sizes are now in the 300-400 byte range and the map status output are
broadcast. If its under the threadshold it sends it as before, the message
contains the DIRECT indicator now.
- synchronize the incoming requests to allow one thread to cache the
serialized output and broadcast the map output status that can then be used by
everyone else. This ensures we don't create multiple broadcast variables when
we don't need to. To ensure this happens I added a second thread pool which
the Dispatcher hands the requests to so that those threads can block without
blocking the main dispatcher threads (which would cause things like heartbeats
and such not to come through)
Note that some of design and code was contributed by @mridulm
## How was this patch tested?
Unit tests and a lot of manually testing.
Ran with akka and netty rpc. Ran with both dynamic allocation on and off.
one of the large jobs I used to test this was a join of 15TB of data. it
had 200,000 map tasks, and 20,000 reduce tasks. Executors ranged from 200 to
2000. This job ran successfully with 5GB of memory on the driver with these
changes. Without these changes I was using 20GB and only had 500 reduce tasks.
The job has 50mb of serialized map output statuses and took roughly the same
amount of time for the executors to get the map output statuses as before.
Ran a variety of other jobs, from large wordcounts to small ones not using
broadcasts.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/tgravescs/spark SPARK-1239
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/spark/pull/12113.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #12113
----
commit 8e4f2efea08b7013a2702543dc3860f9d277e3ac
Author: Thomas Graves <[email protected]>
Date: 2016-04-01T17:44:44Z
[SPARK-1239] Don't fetch all map output statuses at each reducer during
shuffles
commit 3c1def02b80ebfed55904e504609aa02de6559ce
Author: Thomas Graves <[email protected]>
Date: 2016-04-01T18:49:49Z
Update unit test
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]