GitHub user andrewor14 opened a pull request:

    https://github.com/apache/spark/pull/1165

    [SPARK-1777] Prevent OOMs from single partitions

    **Problem.** When caching, we currently unroll the entire RDD partition 
before making sure we have enough free memory. This is a common cause for OOMs 
especially when (1) the BlockManager has little free space left in memory, and 
(2) the partition is large.
    
    **Solution.** We maintain a global memory pool of `M` bytes shared across 
all threads, similar to the way we currently manage memory for shuffle 
aggregation. Then, while we unroll each partition, periodically check if there 
is enough space to continue. If not, drop enough RDD blocks to ensure we have 
at least `M` bytes to work with, then try again. If we still don't have enough 
space to unroll the partition, give up and drop the block to disk directly if 
applicable.
    
    **New configurations.**
    - `spark.storage.bufferFraction` - the value of `M` as a fraction of the 
storage memory (default: 0.2)
    - `spark.storage.safetyFraction` - a margin of safety in case size 
estimation is slightly off. This is the equivalent of the existing 
`spark.shuffle.safetyFraction`. (default 0.9)
    
    For more detail, see the [design 
document](https://issues.apache.org/jira/secure/attachment/12651793/spark-1777-design-doc.pdf).
 Tests pending for performance and memory usage patterns.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/andrewor14/spark them-rdd-memories

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/1165.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1165
    
----
commit c12f093151c2c6cacf2c2cfade4dd1e9512d9edd
Author: Andrew Or <[email protected]>
Date:   2014-06-20T00:55:59Z

    Add SizeTrackingAppendOnlyBuffer and tests
    
    This buffer is supported by an underlying PrimitiveVector. It is to be
    used for unrolling partitions, so we can efficiently check the size of
    the in-memory buffer periodically.
    
    Note that the underlying buffer cannot be an existing implementation of
    a mutable Scala or Java collection. This is because we need to be exposed
    to when the underlying array is resized. Otherwise, size estimation may
    not be accurate.

commit 97ea49900d21dac108d2b0f70395f5824d296724
Author: Andrew Or <[email protected]>
Date:   2014-06-20T01:53:42Z

    Change BlockManager interface to use Arrays
    
    ... rather than ArrayBuffer. We only ever iterate through it anyway,
    so there is really no reason for it to be a mutable buffer of any sort.
    This change is introduced so that we can eventually directly pass our
    SizeTrackingAppendOnlyBuffer's underlying array to BlockManager, instead
    of having to awkwardly make it an ArrayBuffer first.

commit bbd3eea466b284c828962db65248de349a2964da
Author: Andrew Or <[email protected]>
Date:   2014-06-20T19:24:37Z

    Fix CacheManagerSuite to use Array
    
    In addition, avoid using EasyMock for one of our tests, which expects
    BlockManager#put to be called with an Array[Any] parameter. Even with
    all the EasyMock matchers, it is impossible to match an Array[Any]
    because of a combination of the following: (1) the fact that Arrays
    are not covariant, (2) EasyMock provides `aryEq` matchers for all the
    Java primitive types, which conflict with Any, and (3) EasyMock's super
    general matchers like `anyObject` or `isA` also do not match for some
    reason.

commit 776aec9e20f4a0c4beffe45ca07511bcd3fcba32
Author: Andrew Or <[email protected]>
Date:   2014-06-20T22:58:46Z

    Prevent OOM if a single RDD partition is too large
    
    The idea is to always use at least a fixed amount of memory (M bytes)
    to unroll RDD partitions. This space is not reserved, but instead
    allocated dynamically by dropping existing blocks when necessary.
    
    We maintain this buffer as a global quantity shared across all cores.
    The way we synchronize the usage of this buffer is very similar to the
    way we share memory across all threads for shuffle aggregation. In
    particular, each thread cautiously requests for more memory
    periodically,
    and if there is not enough global memory to grant the request, the
    thread concedes and spills. However, this relies on the accuracy of
    size estimation, which is not guaranteed. Therefore, as in the shuffle
    case, we need an equivalent spark.storage.safetyFraction in case size
    estimation is slightly off.
    
    We expose M to the user as spark.storage.bufferFraction, with a default
    value of 0.2.

commit f94f5af0091ffcb4871024e28c149bc8c73c3c87
Author: Andrew Or <[email protected]>
Date:   2014-06-21T01:50:00Z

    Update a few comments (minor)

commit 6d05a81770e597dfca6b08c207c28c1d673ed86d
Author: Andrew Or <[email protected]>
Date:   2014-06-21T01:51:03Z

    Merge branch 'master' of github.com:apache/spark into them-rdd-memories

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to