I asked the same question from Spark community a while ago (http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201311.mbox/%3CCAByMnGtm2s2tyqLzw%2BMdGqgNBLbfhE6-kkZ4OPY4ANfZaDSu7Q%40mail.gmail.com%3E). This is my understanding of how Spark works but I'd like one of the Spark maintainers confirm it.
There are only two ways to remove RDD data from cache: 1) When programmers explicitly call unpersist() or other API on an RDD. 2) When Spark's cache is full, Spark automatically spills cached data into the disk and frees up main memory. Spark uses LRU policy for evicting cache, which means Spark will eventually remove cached data but it may keep them in cache longer than needed. In your case, the cache allocated to stationsAboveHWM can be reclaimed immediately after the getMoveReco() method returns but stationsAboveHWM will remain in the cache. Thanks, Meisam On Sun, Dec 8, 2013 at 12:05 PM, K. Shankari <[email protected]> wrote: > I have some local variables in a function that are generated by a shuffle > operation. To improve performance, I chose to cache() them, with the > assumption that they would be automatically removed from the cache when they > were deallocated. > > However, I am not sure that this is the case. > > I recently changed my code to change from cacheing() one local variable to > cache()ing 2-3, and I have consistently started running out of memory. > > Again, the underlying dataset is ~ 2MB, and while the source RDD for the > groupBy is fairly large, the cached variables are pretty small ~ 10 rows of > (Int, Int) and (Int, Int, Int) > > Can any of the maintainers clarify whether RDDs are supposed to be removed > from the cache when their associated local variables are deallocated? > > Here's my shuffle operation: > > def getFinalState() = { > def maxTs(ss1: StationStatus, ss2: StationStatus) = { > if(ss1.ts > ss2.ts) ss1 else ss2 > } > peer.map(ss => (ss.id, ss)).reduceByKey(maxTs) > } > > and here's where I cache it: > > def getMoveReco(....) = { > val finalStateRDD = stationStatusRDD.getFinalState > val currEmptyStations = finalStateRDD.filter{case(stnId, ss) => > ss.nBikes == 0}.cache > val currFullStations = finalStateRDD.filter{case(stnId, ss) => > ss.nEmpty == 0}.cache > // this is of the form (id, nBikesBelowLWM, nBikesAboveHWM) > val stationWaterMarkDiffRDD = getWaterMarkDiffRDD(finalStateRDD).cache > val stationsAboveHWM = stationWaterMarkDiffRDD.filter{case(id, nblwm, > nahwm) => nahwm > 0}.map{case(s, l, h) => (s, h)}.cache > val stationsBelowLWM = stationWaterMarkDiffRDD.filter{case(id, nblwm, > nahwm) => nblwm > 0}.map{case(s, l, h) => (s, l)}.cache > val balancedStations = stationWaterMarkDiffRDD.filter{case(id, nblwm, > nahwm) => nblwm <= 0 && nahwm <= 0}.cache > ... > } > > Thanks, > Shankari
