GitHub user nkronenfeld opened a pull request:
https://github.com/apache/spark/pull/3570
Clear local copies of accumulators as soon as we're done with them
I suspect this PR is not really quite ready for merging, but this seemed
the best way of getting a few comments on it.
A little explanation:
I'm working with some rather large accumulators. I've been asked why not
just use a reducer, and the answer is, that when this works, it works about 10x
as fast.
And so far, when it doesn't work, it seems improvable. This is my first
attempt at such improvement.
As far as I can tell, on each worker, when each time a task is run, the
following happens:
1. The global accumulator object is cleared on that thread
2. The current individual accumulators are registered on that thread (as
copies)
3. When the task completes, the accumulator values are collected and
returned.
Note that when this is done, nothing is cleared. This means, if one is
using large accumulators, those accumulators just use up memory uselessly until
the next task is run on that thread.
And if a thread somehow dies, that memory is used up and can't be retrieved.
And, as far as I can tell, that local thread value will never be used again.
All I've done so far is to clear local values for that thread as it returns
them.
I would like to do 2 more things:
1. Reduce this to a two-step process: register, and return, both clearing
values
2. Put in something to make sure values are cleared if a task dies (or a
thread too, if that is possible)
This seems so simple so far, though, I was hoping that I could get some
confirmation that I understood things correctly before going on.
I don't have a PR or JIRA issue on this so far - the issues I see are a bit
hard to pin down. When I run my application without this change, it works, but
is very fragile - some tasks take 6 seconds, some 4 minutes. When I implement
this, everything seems to run smoothly, and I stop getting random, large task
times.
This is somewhat related to JIRA issue SPARK-664 in that they both deal
with accumulator performance, but it doesn't really address that directly.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/nkronenfeld/spark-1 Accumulator-Improvements
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/spark/pull/3570.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #3570
----
commit 39a82f2b3ccf68a9a4d1abb0561b5680278a2610
Author: Nathan Kronenfeld <[email protected]>
Date: 2014-12-03T04:11:11Z
Clear local copies of accumulators as soon as we're done with them
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]