I asked the same question from Spark community a while ago
(http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201311.mbox/%3CCAByMnGtm2s2tyqLzw%2BMdGqgNBLbfhE6-kkZ4OPY4ANfZaDSu7Q%40mail.gmail.com%3E).
This is my understanding of how Spark works but I'd like one of the
Spark maintainers confirm it.

There are only two ways to remove RDD data from cache:
1) When programmers explicitly call unpersist() or other API on an RDD.
2) When Spark's cache is full, Spark automatically spills cached data
into the disk and frees up main memory.

Spark uses LRU policy for evicting cache, which means Spark will
eventually remove cached data but it may keep them in cache longer
than needed. In your case, the cache allocated to stationsAboveHWM can
be reclaimed immediately after the getMoveReco() method returns but
stationsAboveHWM will remain in the cache.

Thanks,
Meisam

On Sun, Dec 8, 2013 at 12:05 PM, K. Shankari <[email protected]> wrote:
> I have some local variables in a function that are generated by a shuffle
> operation. To improve performance, I chose to cache() them, with the
> assumption that they would be automatically removed from the cache when they
> were deallocated.
>
> However, I am not sure that this is the case.
>
> I recently changed my code to change from cacheing() one local variable to
> cache()ing 2-3, and I have consistently started running out of memory.
>
> Again, the underlying dataset is ~ 2MB, and while the source RDD for the
> groupBy is fairly large, the cached variables are pretty small ~ 10 rows of
> (Int, Int) and (Int, Int, Int)
>
> Can any of the maintainers clarify whether RDDs are supposed to be removed
> from the cache when their associated local variables are deallocated?
>
> Here's my shuffle operation:
>
>   def getFinalState() = {
>     def maxTs(ss1: StationStatus, ss2: StationStatus) = {
>       if(ss1.ts > ss2.ts) ss1 else ss2
>     }
>     peer.map(ss => (ss.id, ss)).reduceByKey(maxTs)
>   }
>
> and here's where I cache it:
>
> def getMoveReco(....) = {
>       val finalStateRDD = stationStatusRDD.getFinalState
>       val currEmptyStations = finalStateRDD.filter{case(stnId, ss) =>
> ss.nBikes == 0}.cache
>       val currFullStations = finalStateRDD.filter{case(stnId, ss) =>
> ss.nEmpty == 0}.cache
>       // this is of the form (id, nBikesBelowLWM, nBikesAboveHWM)
>       val stationWaterMarkDiffRDD = getWaterMarkDiffRDD(finalStateRDD).cache
>       val stationsAboveHWM = stationWaterMarkDiffRDD.filter{case(id, nblwm,
> nahwm) => nahwm > 0}.map{case(s, l, h) => (s, h)}.cache
>       val stationsBelowLWM = stationWaterMarkDiffRDD.filter{case(id, nblwm,
> nahwm) => nblwm > 0}.map{case(s, l, h) => (s, l)}.cache
>       val balancedStations = stationWaterMarkDiffRDD.filter{case(id, nblwm,
> nahwm) => nblwm <= 0 && nahwm <= 0}.cache
> ...
> }
>
> Thanks,
> Shankari

Reply via email to