Daniel, Is SPARK-1103 <https://issues.apache.org/jira/browse/SPARK-1103> related to your example? Automatic unpersist()-ing of unreferenced RDDs would be nice.
Nick On Tue, May 27, 2014 at 12:28 PM, Daniel Darabos < daniel.dara...@lynxanalytics.com> wrote: > I keep bumping into a problem with persisting RDDs. Consider this (silly) > example: > > def everySecondFromBehind(input: RDD[Int]): RDD[Int] = { > val count = input.count > if (count % 2 == 0) { > return input.filter(_ % 2 == 1) > } else { > return input.filter(_ % 2 == 0) > } > } > > > The situation is that we want to do two things with an RDD (a "count" and > a "filter" in the example). The "input" RDD may represent a very expensive > calculation. So it would make sense to add an "input.cache()" line at the > beginning. But where do we put "input.unpersist()"? > > input.cache()val count = input.countval result = input.filter(...) > input.unpersist()return result > > > "input.filter()" is lazy, so this does not work as expected. We only want > to release "input" from the cache once nothing depends on it anymore. Maybe > "result" was garbage collected. Maybe "result" itself has been cached. But > there is no way to detect such conditions. > > Our current approach is to just leave the RDD cached, and it will get > dumped at some point anyway. Is there a better solution? Thanks for any > tips. >