GitHub user andrewor14 opened a pull request:
https://github.com/apache/spark/pull/1165
[SPARK-1777] Prevent OOMs from single partitions
**Problem.** When caching, we currently unroll the entire RDD partition
before making sure we have enough free memory. This is a common cause for OOMs
especially when (1) the BlockManager has little free space left in memory, and
(2) the partition is large.
**Solution.** We maintain a global memory pool of `M` bytes shared across
all threads, similar to the way we currently manage memory for shuffle
aggregation. Then, while we unroll each partition, periodically check if there
is enough space to continue. If not, drop enough RDD blocks to ensure we have
at least `M` bytes to work with, then try again. If we still don't have enough
space to unroll the partition, give up and drop the block to disk directly if
applicable.
**New configurations.**
- `spark.storage.bufferFraction` - the value of `M` as a fraction of the
storage memory (default: 0.2)
- `spark.storage.safetyFraction` - a margin of safety in case size
estimation is slightly off. This is the equivalent of the existing
`spark.shuffle.safetyFraction`. (default 0.9)
For more detail, see the [design
document](https://issues.apache.org/jira/secure/attachment/12651793/spark-1777-design-doc.pdf).
Tests pending for performance and memory usage patterns.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/andrewor14/spark them-rdd-memories
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/spark/pull/1165.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #1165
----
commit c12f093151c2c6cacf2c2cfade4dd1e9512d9edd
Author: Andrew Or <[email protected]>
Date: 2014-06-20T00:55:59Z
Add SizeTrackingAppendOnlyBuffer and tests
This buffer is supported by an underlying PrimitiveVector. It is to be
used for unrolling partitions, so we can efficiently check the size of
the in-memory buffer periodically.
Note that the underlying buffer cannot be an existing implementation of
a mutable Scala or Java collection. This is because we need to be exposed
to when the underlying array is resized. Otherwise, size estimation may
not be accurate.
commit 97ea49900d21dac108d2b0f70395f5824d296724
Author: Andrew Or <[email protected]>
Date: 2014-06-20T01:53:42Z
Change BlockManager interface to use Arrays
... rather than ArrayBuffer. We only ever iterate through it anyway,
so there is really no reason for it to be a mutable buffer of any sort.
This change is introduced so that we can eventually directly pass our
SizeTrackingAppendOnlyBuffer's underlying array to BlockManager, instead
of having to awkwardly make it an ArrayBuffer first.
commit bbd3eea466b284c828962db65248de349a2964da
Author: Andrew Or <[email protected]>
Date: 2014-06-20T19:24:37Z
Fix CacheManagerSuite to use Array
In addition, avoid using EasyMock for one of our tests, which expects
BlockManager#put to be called with an Array[Any] parameter. Even with
all the EasyMock matchers, it is impossible to match an Array[Any]
because of a combination of the following: (1) the fact that Arrays
are not covariant, (2) EasyMock provides `aryEq` matchers for all the
Java primitive types, which conflict with Any, and (3) EasyMock's super
general matchers like `anyObject` or `isA` also do not match for some
reason.
commit 776aec9e20f4a0c4beffe45ca07511bcd3fcba32
Author: Andrew Or <[email protected]>
Date: 2014-06-20T22:58:46Z
Prevent OOM if a single RDD partition is too large
The idea is to always use at least a fixed amount of memory (M bytes)
to unroll RDD partitions. This space is not reserved, but instead
allocated dynamically by dropping existing blocks when necessary.
We maintain this buffer as a global quantity shared across all cores.
The way we synchronize the usage of this buffer is very similar to the
way we share memory across all threads for shuffle aggregation. In
particular, each thread cautiously requests for more memory
periodically,
and if there is not enough global memory to grant the request, the
thread concedes and spills. However, this relies on the accuracy of
size estimation, which is not guaranteed. Therefore, as in the shuffle
case, we need an equivalent spark.storage.safetyFraction in case size
estimation is slightly off.
We expose M to the user as spark.storage.bufferFraction, with a default
value of 0.2.
commit f94f5af0091ffcb4871024e28c149bc8c73c3c87
Author: Andrew Or <[email protected]>
Date: 2014-06-21T01:50:00Z
Update a few comments (minor)
commit 6d05a81770e597dfca6b08c207c28c1d673ed86d
Author: Andrew Or <[email protected]>
Date: 2014-06-21T01:51:03Z
Merge branch 'master' of github.com:apache/spark into them-rdd-memories
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---