GitHub user JoshRosen opened a pull request:
https://github.com/apache/spark/pull/10170
[SPARK-12165] Fix bug in eviction of storage memory by execution
This patch fixes a bug in the eviction of storage memory by execution.
## The bug:
In general, execution should be able to storage memory when the total
storage memory usage is greater than `maxMemory *
spark.memory.storageFraction`. Due to a bug, however, Spark might wind up
evicting no storage memory in certain cases where the storage memory usage was
between `maxMemory * spark.memory.storageFraction` and `maxMemory`. For
example, here is a regression test which illustrates the bug:
```scala
val maxMemory = 1000L
val taskAttemptId = 0L
val (mm, ms) = makeThings(maxMemory)
// Since we used the default storage fraction (0.5), we should be able
to allocate 500 bytes
// of storage memory which are immune to eviction by execution memory
pressure.
// Acquire enough storage memory to exceed the storage region size
assert(mm.acquireStorageMemory(dummyBlock, 750L, evictedBlocks))
assertEvictBlocksToFreeSpaceNotCalled(ms)
assert(mm.executionMemoryUsed === 0L)
assert(mm.storageMemoryUsed === 750L)
// At this point, storage is using 250 more bytes of memory than it is
guaranteed, so execution
// should be able to reclaim up to 250 bytes of storage memory.
// Therefore, execution should now be able to require up to 500 bytes
of memory:
assert(mm.acquireExecutionMemory(500L, taskAttemptId,
MemoryMode.ON_HEAP) === 500L) // <--- fails by only returning 250L
assert(mm.storageMemoryUsed === 500L)
assert(mm.executionMemoryUsed === 500L)
assertEvictBlocksToFreeSpaceCalled(ms, 250L)
```
The problem relates to the control flow / interaction between
`StorageMemoryPool.shrinkPoolToReclaimSpace()` and
`MemoryStore.ensureFreeSpace()`. While trying to allocate the 500 bytes of
execution memory, the `UnifiedMemoryManager` discovers that it will need to
reclaim 250 bytes of memory from storage, so it calls
`StorageMemoryPool.shrinkPoolToReclaimSpace(250L)`. This method, in turn, calls
`MemoryStore.ensureFreeSpace(250L)`. However, `ensureFreeSpace()` first checks
whether the requested space is less than `maxStorageMemory -
storageMemoryUsed`, which will be true if there is any free execution memory
because it turns out that `MemoryStore.maxStorageMemory = (maxMemory -
onHeapExecutionMemoryPool.memoryUsed)` when the `UnifiedMemoryManager` is used.
The control flow here is somewhat confusing (it grew to be messy /
confusing over time / as a result of the merging / refactoring of several
components). In the pre-Spark 1.6 code, `ensureFreeSpace` was called directly
by the `MemoryStore` itself, whereas in 1.6 it's involved in a confusing
callback flow where `MemoryStore` calls `MemoryManager.acquireStorageMemory`,
which then calls back into `MemoryStore.ensureFreeSpace`, which, in turn, calls
`MemoryManager.freeStorageMemory`.
## The solution:
The solution implemented in this patch is to remove the confusing circular
control flow between `MemoryManager` and `MemoryStore`, making the storage
memory acquisition process much more linear / straightforward. The key changes:
- Remove a layer of inheritance which made the memory manager code harder
to understand (53841174760a24a0df3eb1562af1f33dbe340eb9).
- Move some bounds checks earlier in the call chain
(13ba7ada77f87ef1ec362aec35c89a924e6987cb).
- Refactor `ensureFreeSpace()` so that the part which evicts blocks can be
called independently from the part which checks whether there is enough free
space to avoid eviction (7c68ca09cb1b12f157400866983f753ac863380e).
- Realize that this lets us remove a layer of overloads from
`ensureFreeSpace` (eec4f6c87423d5e482b710e098486b3bbc4daf06).
- Realize that `ensureFreeSpace()` can simply be replaced with an
`evictBlocksToFreeSpace()` method which is called [after we've already figured
out](https://github.com/apache/spark/blob/2dc842aea82c8895125d46a00aa43dfb0d121de9/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala#L88)
how much memory needs to be reclaimed via eviction;
(2dc842aea82c8895125d46a00aa43dfb0d121de9).
Along the way, I fixed some problems with the mocks in
`MemoryManagerSuite`: the old mocks would
[unconditionally](https://github.com/apache/spark/blob/80a824d36eec9d9a9f092ee1741453851218ec73/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala#L84)
report that a block had been evicted even if there was enough space in the
storage pool such that eviction would be avoided.
I also fixed a problem where `StorageMemoryPool._memoryUsed` might become
negative due to freed memory being double-counted when excution evicts storage.
The problem was that `StorageMemoryPoolshrinkPoolToFreeSpace` would [decrement
`_memoryUsed`](https://github.com/apache/spark/commit/7c68ca09cb1b12f157400866983f753ac863380e#diff-935c68a9803be144ed7bafdd2f756a0fL133)
even though `StorageMemoryPool.freeMemory` had already decremented it as each
evicted block was freed.
## TODOs
- [ ] Add stronger assertions or a dedicated regression test for the
`_memoryUsed < 0` bug, which was uncovered while testing this patch and a
related fix for SPARK-12155.
- [ ] See whether we can now remove the confusing `MemoryStore.maxMemory`
method.
/cc @andrewor14 @zsxwing @yhuai @rxin
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/JoshRosen/spark SPARK-12165
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/spark/pull/10170.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #10170
----
commit 012cb4ba9ba1e32337f2f7bb612abb53fe4070be
Author: Josh Rosen <[email protected]>
Date: 2015-12-06T20:57:45Z
Reset ensureFreeSpaceCalled before each test.
This reduces coupling between failed tests.
commit b519fe628a9a2b8238dfedbfd9b74bdd2ddc0de4
Author: Josh Rosen <[email protected]>
Date: 2015-12-07T00:00:23Z
Add regression test for storage eviction bug.
commit 7c68ca09cb1b12f157400866983f753ac863380e
Author: Josh Rosen <[email protected]>
Date: 2015-12-06T22:40:07Z
Add MemoryStore.freeSpaceForExecution() method, which forces blocks to be
dropped.
Previously, ensureFreeSpace() might end up not dropping blocks if the total
storage memory pool usage was less than the maximum possible storage pool
usage.
commit 53841174760a24a0df3eb1562af1f33dbe340eb9
Author: Josh Rosen <[email protected]>
Date: 2015-12-07T01:47:29Z
Remove a layer of confusing inheritance.
commit 13ba7ada77f87ef1ec362aec35c89a924e6987cb
Author: Josh Rosen <[email protected]>
Date: 2015-12-07T01:53:45Z
Put fail-fast for non-fitting blocks earlier in call chain.
commit eec4f6c87423d5e482b710e098486b3bbc4daf06
Author: Josh Rosen <[email protected]>
Date: 2015-12-07T01:56:48Z
Collapse ensureFreeSpace overloads
commit 2dc842aea82c8895125d46a00aa43dfb0d121de9
Author: Josh Rosen <[email protected]>
Date: 2015-12-07T02:56:11Z
Replace ensureFreeSpace() with evictBlocksToFreeSpace().
commit 0eac7da041326cfbec2c1db2f279cb655744f90e
Author: Josh Rosen <[email protected]>
Date: 2015-12-07T03:39:37Z
Update JIRA in test case.
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]