GitHub user tdas reopened a pull request:
https://github.com/apache/spark/pull/126
[SPARK-1103] [WIP] Automatic garbage collection of RDD, shuffle and
broadcast data
This PR allows Spark to automatically cleanup metadata and data related to
persisted RDDs, shuffles and broadcast variables when the corresponding RDDs,
shuffles and broadcast variables fall out of scope from the driver program.
This is still a work in progress as broadcast cleanup has not been implemented.
**Implementation Details**
A new class `ContextCleaner` is responsible cleaning all the state. It is
instantiated as part of a `SparkContext`. RDD and ShuffleDependency classes
have overridden `finalize()` function that gets called whenever their instances
go out of scope. The `finalize()` function enqueues the objectâs identifier
(i.e. RDD ID, shuffle ID, etc.) with the `ContextCleaner`, which is a very
short and cheap operation and should not significantly affect the garbage
collection mechanism. The `ContextCleaner`, on a different thread, performs the
cleanup, whose details are given below.
*RDD cleanup:*
`ContextCleaner` calls `RDD.unpersist()` is used to cleanup persisted RDDs.
Regarding metadata, the DAGScheduler automatically cleans up all metadata
related to a RDD after all jobs have completed. Only the
`SparkContext.persistentRDDs` keeps strong references to persisted RDDs. The
`TimeStampedHashMap` used for that has been replaced by
`TimeStampedWeakValueHashMap` that keeps only weak references to the RDDs,
allowing them to be garbage collected.
*Shuffle cleanup:*
New BlockManager message `RemoveShuffle(<shuffle ID>)` asks the
`BlockManagerMaster` and currently active `BlockManager`s to delete all the
disk blocks related to the shuffle ID. `ContextCleaner` cleans up shuffle data
using this message and also cleans up the metadata in the `MapOutputTracker` of
the driver. The `MapOutputTracker` at the workers, that caches the shuffle
metadata, maintains a `BoundedHashMap` to limit the shuffle information it
caches. Refetching the shuffle information from the driver is not too costly.
*Broadcast cleanup:*
To be done. [This PR](https://github.com/apache/incubator-spark/pull/543/)
adds mechanism for explicit cleanup of broadcast variables.
`Broadcast.finalize()` will enqueue its own ID with ContextCleaner and the PRs
mechanism will be used to unpersist the Broadcast data.
*Other cleanup:*
`ShuffleMapTask` and `ResultTask` caches tasks and used TTL based cleanup
(using `TimeStampedHashMap`), so nothing got cleaned up if TTL was not set.
Instead, they now use `BoundedHashMap` to keep a limited number of map output
information. Cost of repopulating the cache if necessary is very small.
**Current state of implementation**
Implemented RDD and shuffle cleanup. Things left to be done are.
- Cleaning up for broadcast variable still to be done.
- Automatic cleaning up keys with empty weak refs as values in
`TimeStampedWeakValueHashMap`
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/tdas/spark state-cleanup
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/spark/pull/126.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #126
----
commit e427a9eeb8d6b5def3a5ff1b766458588d8b05a9
Author: Tathagata Das <[email protected]>
Date: 2014-02-14T03:14:31Z
Added ContextCleaner to automatically clean RDDs and shuffles when they
fall out of scope. Also replaced TimeStampedHashMap to BoundedHashMaps and
TimeStampedWeakValueHashMap for the necessary hashmap behavior.
commit 8512612036011b5cf688a1643d0d46f144a0f15e
Author: Tathagata Das <[email protected]>
Date: 2014-02-14T08:01:04Z
Changed TimeStampedHashMap to use WrappedJavaHashMap.
commit a24fefccbc93675c939621ea03476b7f993abe4e
Author: Tathagata Das <[email protected]>
Date: 2014-03-11T03:46:12Z
Merge remote-tracking branch 'apache/master' into state-cleanup
Conflicts:
core/src/main/scala/org/apache/spark/MapOutputTracker.scala
core/src/main/scala/org/apache/spark/SparkContext.scala
core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
core/src/main/scala/org/apache/spark/storage/BlockManager.scala
core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala
core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
commit cb0a5a66ce7dbc2ded209d8bdd0cd88953f70b5f
Author: Tathagata Das <[email protected]>
Date: 2014-03-11T18:33:43Z
Fixed docs and styles.
commit ae9da88b3a88a47e9d59b2be327680190c7e904b
Author: Tathagata Das <[email protected]>
Date: 2014-03-12T00:56:36Z
Removed unncessary TimeStampedHashMap from DAGScheduler, added try-catches
in finalize() methods, and replaced ArrayBlockingQueue to LinkedBlockingQueue
to avoid blocking in Java's finalizing thread.
commit e61daa02e9e221625489ea1dd434cf6d3192e474
Author: Tathagata Das <[email protected]>
Date: 2014-03-13T02:08:42Z
Modifications based on the comments on PR 126.
commit a7260d346882bcdfe6e5014c52960017fb602300
Author: Tathagata Das <[email protected]>
Date: 2014-03-17T22:49:50Z
Added try-catch in context cleaner and null value cleaning in
TimeStampedWeakValueHashMap.
commit 892b9520d828cfa7049e6ec70345b3502b139a8e
Author: Tathagata Das <[email protected]>
Date: 2014-03-18T22:09:24Z
Removed use of BoundedHashMap, and made BlockManagerSlaveActor cleanup
shuffle metadata in MapOutputTrackerWorker.
commit e1fba5fee616d810afc2f33979af34721c35238e
Author: Tathagata Das <[email protected]>
Date: 2014-03-19T00:06:51Z
Style fix
commit f2881fd7d4afaead50632418c4a927ecd09eac65
Author: Tathagata Das <[email protected]>
Date: 2014-03-25T18:41:22Z
Changed ContextCleaner to use ReferenceQueue instead of finalizer
commit 620eca349808befa6c339bc5acc351c484495557
Author: Tathagata Das <[email protected]>
Date: 2014-03-25T20:05:47Z
Changes based on PR comments.
commit a00730762891723aa853ea55306713f2ce94cd65
Author: Tathagata Das <[email protected]>
Date: 2014-03-25T21:21:17Z
Merge remote-tracking branch 'apache/master' into state-cleanup
Conflicts:
core/src/main/scala/org/apache/spark/Dependency.scala
core/src/main/scala/org/apache/spark/MapOutputTracker.scala
core/src/main/scala/org/apache/spark/SparkContext.scala
core/src/main/scala/org/apache/spark/SparkEnv.scala
core/src/main/scala/org/apache/spark/rdd/RDD.scala
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
core/src/main/scala/org/apache/spark/storage/BlockManager.scala
core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala
core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
commit d2f8b977f2d78689512b67c82627f0b22e64daa7
Author: Tathagata Das <[email protected]>
Date: 2014-03-25T21:36:07Z
Removed duplicate unpersistRDD.
commit 6c9dcf608a0628a70b1ef48bde985e1e37f7bac4
Author: Tathagata Das <[email protected]>
Date: 2014-03-25T22:14:33Z
Added missing Apache license
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---