GitHub user JoshRosen opened a pull request:

    https://github.com/apache/spark/pull/10170

    [SPARK-12165] Fix bug in eviction of storage memory by execution

    This patch fixes a bug in the eviction of storage memory by execution.
    
    ## The bug:
    
    In general, execution should be able to storage memory when the total 
storage memory usage is greater than `maxMemory * 
spark.memory.storageFraction`. Due to a bug, however, Spark might wind up 
evicting no storage memory in certain cases where the storage memory usage was 
between `maxMemory * spark.memory.storageFraction` and `maxMemory`. For 
example, here is a regression test which illustrates the bug:
    
    ```scala
        val maxMemory = 1000L
        val taskAttemptId = 0L
        val (mm, ms) = makeThings(maxMemory)
        // Since we used the default storage fraction (0.5), we should be able 
to allocate 500 bytes
        // of storage memory which are immune to eviction by execution memory 
pressure.
    
        // Acquire enough storage memory to exceed the storage region size
        assert(mm.acquireStorageMemory(dummyBlock, 750L, evictedBlocks))
        assertEvictBlocksToFreeSpaceNotCalled(ms)
        assert(mm.executionMemoryUsed === 0L)
        assert(mm.storageMemoryUsed === 750L)
     
        // At this point, storage is using 250 more bytes of memory than it is 
guaranteed, so execution
        // should be able to reclaim up to 250 bytes of storage memory.
        // Therefore, execution should now be able to require up to 500 bytes 
of memory:
        assert(mm.acquireExecutionMemory(500L, taskAttemptId, 
MemoryMode.ON_HEAP) === 500L) // <--- fails by only returning 250L
        assert(mm.storageMemoryUsed === 500L)
        assert(mm.executionMemoryUsed === 500L)
        assertEvictBlocksToFreeSpaceCalled(ms, 250L)
    ```
    
    The problem relates to the control flow / interaction between 
`StorageMemoryPool.shrinkPoolToReclaimSpace()` and 
`MemoryStore.ensureFreeSpace()`. While trying to allocate the 500 bytes of 
execution memory, the `UnifiedMemoryManager` discovers that it will need to 
reclaim 250 bytes of memory from storage, so it calls 
`StorageMemoryPool.shrinkPoolToReclaimSpace(250L)`. This method, in turn, calls 
`MemoryStore.ensureFreeSpace(250L)`. However, `ensureFreeSpace()` first checks 
whether the requested space is less than `maxStorageMemory - 
storageMemoryUsed`, which will be true if there is any free execution memory 
because it turns out that `MemoryStore.maxStorageMemory = (maxMemory - 
onHeapExecutionMemoryPool.memoryUsed)` when the `UnifiedMemoryManager` is used.
    
    The control flow here is somewhat confusing (it grew to be messy / 
confusing over time / as a result of the merging / refactoring of several 
components). In the pre-Spark 1.6 code, `ensureFreeSpace` was called directly 
by the `MemoryStore` itself, whereas in 1.6 it's involved in a confusing 
callback flow where `MemoryStore` calls `MemoryManager.acquireStorageMemory`, 
which then calls back into `MemoryStore.ensureFreeSpace`, which, in turn, calls 
`MemoryManager.freeStorageMemory`.
    
    ## The solution:
    
    The solution implemented in this patch is to remove the confusing circular 
control flow between `MemoryManager` and `MemoryStore`, making the storage 
memory acquisition process much more linear / straightforward. The key changes:
    
    - Remove a layer of inheritance which made the memory manager code harder 
to understand (53841174760a24a0df3eb1562af1f33dbe340eb9).
    - Move some bounds checks earlier in the call chain 
(13ba7ada77f87ef1ec362aec35c89a924e6987cb).
    - Refactor `ensureFreeSpace()` so that the part which evicts blocks can be 
called independently from the part which checks whether there is enough free 
space to avoid eviction (7c68ca09cb1b12f157400866983f753ac863380e).
    - Realize that this lets us remove a layer of overloads from 
`ensureFreeSpace` (eec4f6c87423d5e482b710e098486b3bbc4daf06).
    - Realize that `ensureFreeSpace()` can simply be replaced with an 
`evictBlocksToFreeSpace()` method which is called [after we've already figured 
out](https://github.com/apache/spark/blob/2dc842aea82c8895125d46a00aa43dfb0d121de9/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala#L88)
 how much memory needs to be reclaimed via eviction; 
(2dc842aea82c8895125d46a00aa43dfb0d121de9).
    
    Along the way, I fixed some problems with the mocks in 
`MemoryManagerSuite`: the old mocks would 
[unconditionally](https://github.com/apache/spark/blob/80a824d36eec9d9a9f092ee1741453851218ec73/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala#L84)
 report that a block had been evicted even if there was enough space in the 
storage pool such that eviction would be avoided.
    
    I also fixed a problem where `StorageMemoryPool._memoryUsed` might become 
negative due to freed memory being double-counted when excution evicts storage. 
The problem was that `StorageMemoryPoolshrinkPoolToFreeSpace` would [decrement 
`_memoryUsed`](https://github.com/apache/spark/commit/7c68ca09cb1b12f157400866983f753ac863380e#diff-935c68a9803be144ed7bafdd2f756a0fL133)
 even though `StorageMemoryPool.freeMemory` had already decremented it as each 
evicted block was freed.
    
    ## TODOs
    
    - [ ] Add stronger assertions or a dedicated regression test for the 
`_memoryUsed < 0` bug, which was uncovered while testing this patch and a 
related fix for SPARK-12155.
    - [ ] See whether we can now remove the confusing `MemoryStore.maxMemory` 
method.
    
    /cc @andrewor14 @zsxwing @yhuai @rxin 

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/JoshRosen/spark SPARK-12165

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/10170.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #10170
    
----
commit 012cb4ba9ba1e32337f2f7bb612abb53fe4070be
Author: Josh Rosen <[email protected]>
Date:   2015-12-06T20:57:45Z

    Reset ensureFreeSpaceCalled before each test.
    
    This reduces coupling between failed tests.

commit b519fe628a9a2b8238dfedbfd9b74bdd2ddc0de4
Author: Josh Rosen <[email protected]>
Date:   2015-12-07T00:00:23Z

    Add regression test for storage eviction bug.

commit 7c68ca09cb1b12f157400866983f753ac863380e
Author: Josh Rosen <[email protected]>
Date:   2015-12-06T22:40:07Z

    Add MemoryStore.freeSpaceForExecution() method, which forces blocks to be 
dropped.
    
    Previously, ensureFreeSpace() might end up not dropping blocks if the total
    storage memory pool usage was less than the maximum possible storage pool 
usage.

commit 53841174760a24a0df3eb1562af1f33dbe340eb9
Author: Josh Rosen <[email protected]>
Date:   2015-12-07T01:47:29Z

    Remove a layer of confusing inheritance.

commit 13ba7ada77f87ef1ec362aec35c89a924e6987cb
Author: Josh Rosen <[email protected]>
Date:   2015-12-07T01:53:45Z

    Put fail-fast for non-fitting blocks earlier in call chain.

commit eec4f6c87423d5e482b710e098486b3bbc4daf06
Author: Josh Rosen <[email protected]>
Date:   2015-12-07T01:56:48Z

    Collapse ensureFreeSpace overloads

commit 2dc842aea82c8895125d46a00aa43dfb0d121de9
Author: Josh Rosen <[email protected]>
Date:   2015-12-07T02:56:11Z

    Replace ensureFreeSpace() with evictBlocksToFreeSpace().

commit 0eac7da041326cfbec2c1db2f279cb655744f90e
Author: Josh Rosen <[email protected]>
Date:   2015-12-07T03:39:37Z

    Update JIRA in test case.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to