GitHub user tdas reopened a pull request:

    https://github.com/apache/spark/pull/126

    [SPARK-1103] [WIP] Automatic garbage collection of RDD, shuffle and 
broadcast data

    This PR allows Spark to automatically cleanup metadata and data related to 
persisted RDDs, shuffles and broadcast variables when the corresponding RDDs, 
shuffles and broadcast variables fall out of scope from the driver program. 
This is still a work in progress as broadcast cleanup has not been implemented.
    
    **Implementation Details** 
    A new class `ContextCleaner` is responsible cleaning all the state. It is 
instantiated as part of a `SparkContext`. RDD and ShuffleDependency classes 
have overridden `finalize()` function that gets called whenever their instances 
go out of scope. The `finalize()` function enqueues the object’s identifier 
(i.e. RDD ID, shuffle ID, etc.) with the `ContextCleaner`, which is a very 
short and cheap operation and should not significantly affect the garbage 
collection mechanism. The `ContextCleaner`, on a different thread, performs the 
cleanup, whose details are given below.
    
    *RDD cleanup:*
    `ContextCleaner` calls `RDD.unpersist()` is used to cleanup persisted RDDs. 
Regarding metadata, the DAGScheduler automatically cleans up all metadata 
related to a RDD after all jobs have completed. Only the 
`SparkContext.persistentRDDs` keeps strong references to persisted RDDs. The 
`TimeStampedHashMap` used for that has been replaced by 
`TimeStampedWeakValueHashMap` that keeps only weak references to the RDDs, 
allowing them to be garbage collected.
    
    *Shuffle cleanup:*
    New BlockManager message `RemoveShuffle(<shuffle ID>)` asks the 
`BlockManagerMaster` and currently active `BlockManager`s to delete all the 
disk blocks related to the shuffle ID. `ContextCleaner` cleans up shuffle data 
using this message and also cleans up the metadata in the `MapOutputTracker` of 
the driver. The `MapOutputTracker` at the workers, that caches the shuffle 
metadata, maintains a `BoundedHashMap` to limit the shuffle information it 
caches. Refetching the shuffle information from the driver is not too costly.
    
    *Broadcast cleanup:*
    To be done. [This PR](https://github.com/apache/incubator-spark/pull/543/) 
adds mechanism for explicit cleanup of broadcast variables. 
`Broadcast.finalize()` will enqueue its own ID with ContextCleaner and the PRs 
mechanism will be used to unpersist the Broadcast data.
    
    *Other cleanup:*
    `ShuffleMapTask` and `ResultTask` caches tasks and used TTL based cleanup 
(using `TimeStampedHashMap`), so nothing got cleaned up if TTL was not set. 
Instead, they now use `BoundedHashMap` to keep a limited number of map output 
information. Cost of repopulating the cache if necessary is very small. 
    
    **Current state of implementation**
    Implemented RDD and shuffle cleanup. Things left to be done are. 
    - Cleaning up for broadcast variable still to be done. 
    - Automatic cleaning up keys with empty weak refs as values in 
`TimeStampedWeakValueHashMap`


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tdas/spark state-cleanup

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/126.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #126
    
----
commit e427a9eeb8d6b5def3a5ff1b766458588d8b05a9
Author: Tathagata Das <[email protected]>
Date:   2014-02-14T03:14:31Z

    Added ContextCleaner to automatically clean RDDs and shuffles when they 
fall out of scope. Also replaced TimeStampedHashMap to BoundedHashMaps and 
TimeStampedWeakValueHashMap for the necessary hashmap behavior.

commit 8512612036011b5cf688a1643d0d46f144a0f15e
Author: Tathagata Das <[email protected]>
Date:   2014-02-14T08:01:04Z

    Changed TimeStampedHashMap to use WrappedJavaHashMap.

commit a24fefccbc93675c939621ea03476b7f993abe4e
Author: Tathagata Das <[email protected]>
Date:   2014-03-11T03:46:12Z

    Merge remote-tracking branch 'apache/master' into state-cleanup
    
    Conflicts:
        core/src/main/scala/org/apache/spark/MapOutputTracker.scala
        core/src/main/scala/org/apache/spark/SparkContext.scala
        core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
        core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
        core/src/main/scala/org/apache/spark/storage/BlockManager.scala
        core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala
        core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala

commit cb0a5a66ce7dbc2ded209d8bdd0cd88953f70b5f
Author: Tathagata Das <[email protected]>
Date:   2014-03-11T18:33:43Z

    Fixed docs and styles.

commit ae9da88b3a88a47e9d59b2be327680190c7e904b
Author: Tathagata Das <[email protected]>
Date:   2014-03-12T00:56:36Z

    Removed unncessary TimeStampedHashMap from DAGScheduler, added try-catches 
in finalize() methods, and replaced ArrayBlockingQueue to LinkedBlockingQueue 
to avoid blocking in Java's finalizing thread.

commit e61daa02e9e221625489ea1dd434cf6d3192e474
Author: Tathagata Das <[email protected]>
Date:   2014-03-13T02:08:42Z

    Modifications based on the comments on PR 126.

commit a7260d346882bcdfe6e5014c52960017fb602300
Author: Tathagata Das <[email protected]>
Date:   2014-03-17T22:49:50Z

    Added try-catch in context cleaner and null value cleaning in 
TimeStampedWeakValueHashMap.

commit 892b9520d828cfa7049e6ec70345b3502b139a8e
Author: Tathagata Das <[email protected]>
Date:   2014-03-18T22:09:24Z

    Removed use of BoundedHashMap, and made BlockManagerSlaveActor cleanup 
shuffle metadata in MapOutputTrackerWorker.

commit e1fba5fee616d810afc2f33979af34721c35238e
Author: Tathagata Das <[email protected]>
Date:   2014-03-19T00:06:51Z

    Style fix

commit f2881fd7d4afaead50632418c4a927ecd09eac65
Author: Tathagata Das <[email protected]>
Date:   2014-03-25T18:41:22Z

    Changed ContextCleaner to use ReferenceQueue instead of finalizer

commit 620eca349808befa6c339bc5acc351c484495557
Author: Tathagata Das <[email protected]>
Date:   2014-03-25T20:05:47Z

    Changes based on PR comments.

commit a00730762891723aa853ea55306713f2ce94cd65
Author: Tathagata Das <[email protected]>
Date:   2014-03-25T21:21:17Z

    Merge remote-tracking branch 'apache/master' into state-cleanup
    
    Conflicts:
        core/src/main/scala/org/apache/spark/Dependency.scala
        core/src/main/scala/org/apache/spark/MapOutputTracker.scala
        core/src/main/scala/org/apache/spark/SparkContext.scala
        core/src/main/scala/org/apache/spark/SparkEnv.scala
        core/src/main/scala/org/apache/spark/rdd/RDD.scala
        core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
        core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
        core/src/main/scala/org/apache/spark/storage/BlockManager.scala
        core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala
        core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala

commit d2f8b977f2d78689512b67c82627f0b22e64daa7
Author: Tathagata Das <[email protected]>
Date:   2014-03-25T21:36:07Z

    Removed duplicate unpersistRDD.

commit 6c9dcf608a0628a70b1ef48bde985e1e37f7bac4
Author: Tathagata Das <[email protected]>
Date:   2014-03-25T22:14:33Z

    Added missing Apache license

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to