That's right, with the minor clarification that Spark will only spill
cached data to disk if it was persisted with a StorageLevel that allows
spilling to disk (e.g. MEM_AND_DISK); otherwise, it will simply drop the
blocks and they'll have to be recomputed from lineage if they're accessed
again.

It would be cool to automatically uncache() RDDs when we know that they're
never going to be accessed again, but I don't see an easy way to implement
this.  An RDD might participate in a lineage chain that keeps a reference
to it, so even though that RDD might not ever be explicitly accessed again
by the driver code there might be other Spark-internal references to it
that would prevent the Java GC from reclaiming the RDD's object in the
driver.

It might be possible to rig a solution where RDD transformations return
some sort of facade to driver such that only the user's driver code keeps
strong references to it (the underlying linage chain would be build on the
RDDs behind the facade using strong references).  This would allow us to
discover when the RDD variable becomes inaccessible in the driver and we
might be able to implement cleanup code the facade's finalizer, but the
cleanup might be delayed until the driver's GC runs (which might not happen
for a very long time).

If we could perform a static analysis of the entire user program, we could
automatically insert uncache() calls.


On Sun, Dec 8, 2013 at 10:16 AM, Meisam Fathi <[email protected]>wrote:

> I asked the same question from Spark community a while ago
> (
> http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201311.mbox/%3CCAByMnGtm2s2tyqLzw%2BMdGqgNBLbfhE6-kkZ4OPY4ANfZaDSu7Q%40mail.gmail.com%3E
> ).
> This is my understanding of how Spark works but I'd like one of the
> Spark maintainers confirm it.
>
> There are only two ways to remove RDD data from cache:
> 1) When programmers explicitly call unpersist() or other API on an RDD.
> 2) When Spark's cache is full, Spark automatically spills cached data
> into the disk and frees up main memory.
>
> Spark uses LRU policy for evicting cache, which means Spark will
> eventually remove cached data but it may keep them in cache longer
> than needed. In your case, the cache allocated to stationsAboveHWM can
> be reclaimed immediately after the getMoveReco() method returns but
> stationsAboveHWM will remain in the cache.
>
> Thanks,
> Meisam
>
> On Sun, Dec 8, 2013 at 12:05 PM, K. Shankari <[email protected]>
> wrote:
> > I have some local variables in a function that are generated by a shuffle
> > operation. To improve performance, I chose to cache() them, with the
> > assumption that they would be automatically removed from the cache when
> they
> > were deallocated.
> >
> > However, I am not sure that this is the case.
> >
> > I recently changed my code to change from cacheing() one local variable
> to
> > cache()ing 2-3, and I have consistently started running out of memory.
> >
> > Again, the underlying dataset is ~ 2MB, and while the source RDD for the
> > groupBy is fairly large, the cached variables are pretty small ~ 10 rows
> of
> > (Int, Int) and (Int, Int, Int)
> >
> > Can any of the maintainers clarify whether RDDs are supposed to be
> removed
> > from the cache when their associated local variables are deallocated?
> >
> > Here's my shuffle operation:
> >
> >   def getFinalState() = {
> >     def maxTs(ss1: StationStatus, ss2: StationStatus) = {
> >       if(ss1.ts > ss2.ts) ss1 else ss2
> >     }
> >     peer.map(ss => (ss.id, ss)).reduceByKey(maxTs)
> >   }
> >
> > and here's where I cache it:
> >
> > def getMoveReco(....) = {
> >       val finalStateRDD = stationStatusRDD.getFinalState
> >       val currEmptyStations = finalStateRDD.filter{case(stnId, ss) =>
> > ss.nBikes == 0}.cache
> >       val currFullStations = finalStateRDD.filter{case(stnId, ss) =>
> > ss.nEmpty == 0}.cache
> >       // this is of the form (id, nBikesBelowLWM, nBikesAboveHWM)
> >       val stationWaterMarkDiffRDD =
> getWaterMarkDiffRDD(finalStateRDD).cache
> >       val stationsAboveHWM = stationWaterMarkDiffRDD.filter{case(id,
> nblwm,
> > nahwm) => nahwm > 0}.map{case(s, l, h) => (s, h)}.cache
> >       val stationsBelowLWM = stationWaterMarkDiffRDD.filter{case(id,
> nblwm,
> > nahwm) => nblwm > 0}.map{case(s, l, h) => (s, l)}.cache
> >       val balancedStations = stationWaterMarkDiffRDD.filter{case(id,
> nblwm,
> > nahwm) => nblwm <= 0 && nahwm <= 0}.cache
> > ...
> > }
> >
> > Thanks,
> > Shankari
>

Reply via email to