GitHub user JoshRosen opened a pull request:

    https://github.com/apache/spark/pull/11874

    [SPARK-3000] Allow MemoryStore blocks to be dropped to disk in parallel

    This patch refactors the BlockManager and MemoryManager 
locking/synchronization logic in order to allow MemoryStore blocks to be 
dropped to the disk in parallel, significantly reducing `MemoryManager` lock 
contention when acquiring storage memory.
    
    ## Key change
    
    Prior to this patch, `MemoryStore.evictBlocksToFreeSpace()` synchronized on 
`memoryManager` for the entire block-dropping process. This patch reduces the 
scope of the synchronization to only cover the phase in which we are selecting 
the set of blocks to be evicted. Once a task has committed to dropping a set of 
blocks, it now releases the `memoryManager` lock and performs the block 
eviction without holding any locks. This allows multiple tasks to drop blocks 
to disk in parallel and avoids blocking non-spilling tasks via contention on 
the `memoryManager` lock.
    
    One tricky concern is preventing tasks from stealing the freed storage 
memory which results from eviction. To address this, this patch modifies the 
eviction path so that evicted blocks' memory is not immediately freed with the 
memory manager. Once all of a tasks's block evictions have completed, tasks now 
compare the total amount of memory freed from eviction (which could exceed 
their request size) to the amount of memory needed and either acquire more 
memory or free excess storage memory.
    
    Another concern is making sure that a single block isn't double-evicted by 
multiple tasks. Double-eviction is prevented by the fact that tasks obtain 
exclusive write locks on all of the blocks that they will evict and the fact 
that the process of selecting which blocks to evict is still synchronized on 
the `memoryManager`.
    
    Note that a single task will still spill blocks sequentially, so 
parallelism only results from multiple tasks triggering separate block 
evictions. If we want to enable a single task's evictions to proceed in 
parallel then we can do that in a separate followup patch (it should be a 
fairly straightforward change).
    
    ## Motivating performance results
    
    This patch is motivated by a simple `spark-shell` benchmark that I wrote to 
stress the BlockManager's block eviction path:
    
    ```scala
    import org.apache.spark.storage.StorageLevel
    val startTime = System.currentTimeMillis
    (1 to 100).map { _ => 
      sc.parallelize(1 to 1000, 100)
        .map(x => new Array[Byte](1024 * 1024 / 10))
        .persist(StorageLevel.MEMORY_AND_DISK)
    }.map { x => x.count(); x }
    val endTime = System.currentTimeMillis
    println((endTime - startTime) / 1000.0)
    ```
    
    ----
    
    **Prior to this patch**, I observed extreme contention for the 
`UnifiedMemoryManager`'s monitor:
    
    
![image](https://cloud.githubusercontent.com/assets/50748/13939792/453e3ac6-ef95-11e5-80fc-c12ea5d12a66.png)
    
    
![image](https://cloud.githubusercontent.com/assets/50748/13939772/273e4a66-ef95-11e5-80c2-fb181733d2c0.png)
    
    ----
    
    **After this patch**, the contention on the `UnifiedMemoryManager` lock is 
significantly reduced and the lock contention bottlenecks shifted to other 
sources:
    
    
![image](https://cloud.githubusercontent.com/assets/50748/13939698/9d70557c-ef94-11e5-9af5-c99f2a342793.png)
    
    
![image](https://cloud.githubusercontent.com/assets/50748/13939711/b0229bc6-ef94-11e5-8bff-520f58da726e.png)

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/JoshRosen/spark SPARK-3000

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/11874.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #11874
    
----
commit e4aa6abe86ae6fb3e20208863a6807468605b48d
Author: Josh Rosen <[email protected]>
Date:   2016-03-21T19:45:18Z

    Remove unreachable code in BlockManager.dropFromMemory
    
    This method is only ever called on blocks which are present in the
    MemoryStore, so we don’t need a bunch of the old error-handling
    code to cope with this being called on non-existent blocks.

commit c564a40589ed98b148f41ca64f3aaac1fdcec06e
Author: Josh Rosen <[email protected]>
Date:   2016-03-21T21:28:08Z

    Minor re-arrangement of locking code to reduce contention.

commit 847fdae7a6fbda13ac5c2537671b2c27363c8175
Author: Josh Rosen <[email protected]>
Date:   2016-03-21T23:32:54Z

    Narrow scope of synchronization in acquireStorageMemory.

commit 5854662717810d27e386fe51cb4d11bf0377f20c
Author: Josh Rosen <[email protected]>
Date:   2016-03-21T23:37:21Z

    Improve locking in code unroll memory transfer code.

commit b03ff592feb090e2e6f1801088baa1cbcc8eb0f1
Author: Josh Rosen <[email protected]>
Date:   2016-03-22T00:42:53Z

    Merge remote-tracking branch 'origin/master' into SPARK-3000

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to