I have some local variables in a function that are generated by a shuffle
operation. To improve performance, I chose to cache() them, with the
assumption that they would be automatically removed from the cache when
they were deallocated.

However, I am not sure that this is the case.

I recently changed my code to change from cacheing() one local variable to
cache()ing 2-3, and I have consistently started running out of memory.

Again, the underlying dataset is ~ 2MB, and while the source RDD for the
groupBy is fairly large, the cached variables are pretty small ~ 10 rows of
(Int, Int) and (Int, Int, Int)

Can any of the maintainers clarify whether RDDs are supposed to be removed
from the cache when their associated local variables are deallocated?

Here's my shuffle operation:

  def getFinalState() = {
    def maxTs(ss1: StationStatus, ss2: StationStatus) = {
      if(ss1.ts > ss2.ts) ss1 else ss2
    }
    peer.map(ss => (ss.id, ss)).reduceByKey(maxTs)
  }

and here's where I cache it:

def getMoveReco(....) = {
      val finalStateRDD = stationStatusRDD.getFinalState
      val currEmptyStations = finalStateRDD.filter{case(stnId, ss) =>
ss.nBikes == 0}.cache
      val currFullStations = finalStateRDD.filter{case(stnId, ss) =>
ss.nEmpty == 0}.cache
      // this is of the form (id, nBikesBelowLWM, nBikesAboveHWM)
      val stationWaterMarkDiffRDD = getWaterMarkDiffRDD(finalStateRDD).cache
      val stationsAboveHWM = stationWaterMarkDiffRDD.filter{case(id, nblwm,
nahwm) => nahwm > 0}.map{case(s, l, h) => (s, h)}.cache
      val stationsBelowLWM = stationWaterMarkDiffRDD.filter{case(id, nblwm,
nahwm) => nblwm > 0}.map{case(s, l, h) => (s, l)}.cache
      val balancedStations = stationWaterMarkDiffRDD.filter{case(id, nblwm,
nahwm) => nblwm <= 0 && nahwm <= 0}.cache
...
}

Thanks,
Shankari

Reply via email to