LRU. See org.apache.spark.storage.MemoryStore#ensureFreeSpace -- just iterates through entries in order (i.e. the order in which blocks were added to the cache.)
On Mon, Dec 2, 2013 at 9:08 PM, Yadid Ayzenberg <[email protected]> wrote: > Thanks Mark, that makes perfect sense. > I guess I still don't have a full picture in my head when in comes to the > caching: > How is the RDD cache managed (assuming not enough memory for all the > cached RDDs): is it LRU or LFU, or something else ? > > > Thanks, > > Yadid > > > > On 11/30/13 10:56 PM, Mark Hamstra wrote: > > If you're not performing any action until the very end, then there is no > need to cache at all. > > Observe (and don't make the mistake of thinking that my code represents > best practice and not just an illustration): > > scala> val rdd0 = sc.parallelize(1 to 4, 2) > rdd0: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at > parallelize at <console>:12 > > scala> var rdd = rdd0 > rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at > parallelize at <console>:12 > > scala> def starsPlusOne(idx: Int, itr: Iterator[Int]) = { > | println("*" * (idx + 1)) > | itr.map(_ + 1) > | } > starsPlusOne: (idx: Int, itr: Iterator[Int])Iterator[Int] > > scala> rdd.mapPartitionsWithIndex(starsPlusOne, true) > res0: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at > mapPartitionsWithIndex at <console>:19 > > scala> rdd.collect > res1: Array[Int] = Array(1, 2, 3, 4) > > scala> rdd = res0 > rdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at > mapPartitionsWithIndex at <console>:19 > > scala> rdd.collect > * > ** > res2: Array[Int] = Array(2, 3, 4, 5) > > scala> rdd = rdd0 > rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at > parallelize at <console>:12 > > scala> rdd = rdd.mapPartitionsWithIndex(starsPlusOne, true) > rdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[2] at > mapPartitionsWithIndex at <console>:18 > > scala> rdd.collect > * > ** > res3: Array[Int] = Array(2, 3, 4, 5) > > scala> rdd = rdd.mapPartitionsWithIndex(starsPlusOne, true) > rdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[3] at > mapPartitionsWithIndex at <console>:18 > > scala> rdd.collect > * > * > ** > ** > res4: Array[Int] = Array(3, 4, 5, 6) > > scala> rdd = rdd.mapPartitionsWithIndex(starsPlusOne, true) > rdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[4] at > mapPartitionsWithIndex at <console>:18 > > scala> rdd.collect > * > * > * > ** > ** > ** > res5: Array[Int] = Array(4, 5, 6, 7) > > scala> rdd = rdd0 > rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at > parallelize at <console>:12 > > scala> for (_ <- 1 to 3) { rdd = > rdd.mapPartitionsWithIndex(starsPlusOne, true) } > > scala> rdd.collect > * > * > * > ** > ** > ** > res7: Array[Int] = Array(4, 5, 6, 7) > > scala> rdd = rdd0 > rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at > parallelize at <console>:12 > > scala> for (_ <- 1 to 3) { > | rdd = rdd.mapPartitionsWithIndex(starsPlusOne, true) > | rdd.cache() > | } > > scala> rdd.collect > * > * > * > ** > ** > ** > res9: Array[Int] = Array(4, 5, 6, 7) > > scala> rdd = rdd0 > rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at > parallelize at <console>:12 > > scala> for (_ <- 1 to 3) { > | rdd = rdd.mapPartitionsWithIndex(starsPlusOne, true) > | rdd.cache() > | println(rdd.sum) > | } > * > ** > 14.0 > * > ** > 18.0 > * > ** > 22.0 > > scala> rdd.collect > res11: Array[Int] = Array(4, 5, 6, 7) > > scala> rdd = rdd0 > rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at > parallelize at <console>:12 > > scala> var temp = rdd0 > temp: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at > parallelize at <console>:12 > > scala> for (_ <- 1 to 3) { > | temp = rdd > | rdd = rdd.mapPartitionsWithIndex(starsPlusOne, true) > | rdd.cache() > | temp.unpersist() > | println(rdd.sum) > | } > * > ** > 14.0 > * > * > ** > ** > 18.0 > * > * > * > ** > ** > ** > 22.0 > > scala> rdd.collect > > res13: Array[Int] = Array(4, 5, 6, 7) > > scala> rdd = rdd0 > rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at > parallelize at <console>:12 > > scala> for (_ <- 1 to 3) { > | temp = rdd > | rdd = rdd.mapPartitionsWithIndex(starsPlusOne, true) > | rdd.cache() > | println(rdd.sum) > | temp.unpersist() > | } > * > ** > 14.0 > * > ** > 18.0 > * > ** > 22.0 > > scala> rdd.collect > res14: Array[Int] = Array(4, 5, 6, 7) > > > > Take-aways: > > 1. spark-shell is your friend > 2. assigning an RDD to a var doesn't modify the underlying collection; > the var is more just a pointer into the specified lineage of > transformations for the underlying collection > 3. cache() doesn't really do anything useful unless more than one > action is performed on the underlying collection > 4. performing gratuitous actions within a loop just so that calling > cache() within the loop will have some immediate effect will usually not > change the total amount of work done outside the gratuitous job vs. just > doing one action after the looped transformations > 5. trying to unpersist() prior cached iterations can work, but it is > sensitive to where it occurs relative to actions > > > > On Sat, Nov 30, 2013 at 6:39 PM, Yadid Ayzenberg <[email protected]>wrote: > >> step 4. would be count(), or collect(). The map() (in step 2.) would >> be performing calculations and writing information to a DB. >> >> Is this the information that was missing ? >> >> Thanks, >> >> Yadid >> >> >> >> >> >> >> On 11/30/13 9:24 PM, Mark Hamstra wrote: >> >> Your question doesn't really make any sense without specifying where any >> RDD actions take place (i.e. where Spark jobs are actually run.) Without >> any actions, all you've outlined so far are different ways to specify the >> chain of transformations that should be evaluated when an action is >> eventually called and a job runs. In a real sense your code hasn't >> actually done anything yet. >> >> >> On Sat, Nov 30, 2013 at 6:01 PM, Yadid Ayzenberg <[email protected]>wrote: >> >>> >>> >>> >>> Hi All, >>> >>> Im trying to implement the following and would like to know in which >>> places I should be calling RDD.cache(): >>> >>> Suppose I have a group of RDDs : RDD1 to RDDn as input. >>> >>> 1. create a single RDD_total = RDD1.union(RDD2)..union(RDDn) >>> >>> 2. for i = 0 to x: RDD_total = RDD_total.map (some map function()); >>> >>> 3. return RDD_total. >>> >>> I that I should cache RDD total in order to optimize the iterations. >>> Should I just be calling RDD_total.cache() at the end of each iteration ? >>> or should I be preforming something more elaborate: >>> >>> >>> RDD_temp = RDD_total.map (some map function()); >>> RDD_total.unpersist(); >>> RDD_total = RDD_temp.cache(); >>> >>> >>> >>> Thanks, >>> Yadid >>> >>> >>> >>> >>> >>> >>> >> >> > >
