Thanks Mark, that makes perfect sense.
I guess I still don't have a full picture in my head when in comes to the caching: How is the RDD cache managed (assuming not enough memory for all the cached RDDs): is it LRU or LFU, or something else ?


Thanks,

Yadid


On 11/30/13 10:56 PM, Mark Hamstra wrote:
If you're not performing any action until the very end, then there is no need to cache at all.

Observe (and don't make the mistake of thinking that my code represents best practice and not just an illustration):

    scala> val rdd0 = sc.parallelize(1 to 4, 2)
    rdd0: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at
    parallelize at <console>:12

    scala> var rdd = rdd0
    rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at
    parallelize at <console>:12

    scala> def starsPlusOne(idx: Int, itr: Iterator[Int]) = {
         |   println("*" * (idx + 1))
         |   itr.map(_ + 1)
         | }
    starsPlusOne: (idx: Int, itr: Iterator[Int])Iterator[Int]

    scala> rdd.mapPartitionsWithIndex(starsPlusOne, true)
    res0: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at
    mapPartitionsWithIndex at <console>:19

    scala> rdd.collect
    res1: Array[Int] = Array(1, 2, 3, 4)

    scala> rdd = res0
    rdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at
    mapPartitionsWithIndex at <console>:19

    scala> rdd.collect
    *
    **
    res2: Array[Int] = Array(2, 3, 4, 5)

    scala> rdd = rdd0
    rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at
    parallelize at <console>:12

    scala> rdd = rdd.mapPartitionsWithIndex(starsPlusOne, true)
    rdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[2] at
    mapPartitionsWithIndex at <console>:18

    scala> rdd.collect
    *
    **
    res3: Array[Int] = Array(2, 3, 4, 5)

    scala> rdd = rdd.mapPartitionsWithIndex(starsPlusOne, true)
    rdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[3] at
    mapPartitionsWithIndex at <console>:18

    scala> rdd.collect
    *
    *
    **
    **
    res4: Array[Int] = Array(3, 4, 5, 6)

    scala> rdd = rdd.mapPartitionsWithIndex(starsPlusOne, true)
    rdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[4] at
    mapPartitionsWithIndex at <console>:18

    scala> rdd.collect
    *
    *
    *
    **
    **
    **
    res5: Array[Int] = Array(4, 5, 6, 7)

    scala> rdd = rdd0
    rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at
    parallelize at <console>:12

    scala> for (_ <- 1 to 3) { rdd =
    rdd.mapPartitionsWithIndex(starsPlusOne, true) }

    scala> rdd.collect
    *
    *
    *
    **
    **
    **
    res7: Array[Int] = Array(4, 5, 6, 7)

    scala> rdd = rdd0
    rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at
    parallelize at <console>:12

    scala> for (_ <- 1 to 3) {
         |   rdd = rdd.mapPartitionsWithIndex(starsPlusOne, true)
         |   rdd.cache()
         | }

    scala> rdd.collect
    *
    *
    *
    **
    **
    **
    res9: Array[Int] = Array(4, 5, 6, 7)

    scala> rdd = rdd0
    rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at
    parallelize at <console>:12

    scala> for (_ <- 1 to 3) {
         |   rdd = rdd.mapPartitionsWithIndex(starsPlusOne, true)
         |   rdd.cache()
         |   println(rdd.sum)
         | }
    *
    **
    14.0
    *
    **
    18.0
    *
    **
    22.0

    scala> rdd.collect
    res11: Array[Int] = Array(4, 5, 6, 7)

    scala> rdd = rdd0
    rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at
    parallelize at <console>:12

    scala> var temp = rdd0
    temp: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at
    parallelize at <console>:12

    scala> for (_ <- 1 to 3) {
         |   temp = rdd
         |   rdd = rdd.mapPartitionsWithIndex(starsPlusOne, true)
         |   rdd.cache()
         |   temp.unpersist()
         |   println(rdd.sum)
         | }
    *
    **
    14.0
    *
    *
    **
    **
    18.0
    *
    *
    *
    **
    **
    **
    22.0

    scala> rdd.collect

    res13: Array[Int] = Array(4, 5, 6, 7)

    scala> rdd = rdd0
    rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at
    parallelize at <console>:12

    scala> for (_ <- 1 to 3) {
         |   temp = rdd
         |   rdd = rdd.mapPartitionsWithIndex(starsPlusOne, true)
         |   rdd.cache()
         |   println(rdd.sum)
         |   temp.unpersist()
         | }
    *
    **
    14.0
    *
    **
    18.0
    *
    **
    22.0

    scala> rdd.collect
    res14: Array[Int] = Array(4, 5, 6, 7)



Take-aways:

 1. spark-shell is your friend
 2. assigning an RDD to a var doesn't modify the underlying
    collection; the var is more just a pointer into the specified
    lineage of transformations for the underlying collection
 3. cache() doesn't really do anything useful unless more than one
    action is performed on the underlying collection
 4. performing gratuitous actions within a loop just so that calling
    cache() within the loop will have some immediate effect will
    usually not change the total amount of work done outside the
    gratuitous job vs. just doing one action after the looped
    transformations
 5. trying to unpersist() prior cached iterations can work, but it is
    sensitive to where it occurs relative to actions



On Sat, Nov 30, 2013 at 6:39 PM, Yadid Ayzenberg <[email protected] <mailto:[email protected]>> wrote:

step 4. would be count(), or collect(). The map() (in step 2.) would be performing calculations and writing information to a DB.

    Is this the information that was missing ?

    Thanks,

    Yadid






    On 11/30/13 9:24 PM, Mark Hamstra wrote:
    Your question doesn't really make any sense without specifying
    where any RDD actions take place (i.e. where Spark jobs are
    actually run.)  Without any actions, all you've outlined so far
    are different ways to specify the chain of transformations that
    should be evaluated when an action is eventually called and a job
    runs.  In a real sense your code hasn't actually done anything yet.


    On Sat, Nov 30, 2013 at 6:01 PM, Yadid Ayzenberg
    <[email protected] <mailto:[email protected]>> wrote:




        Hi All,

        Im trying to implement the following and would like to know
        in which places I should be calling RDD.cache():

        Suppose I have a group of RDDs : RDD1 to RDDn as input.

        1. create a single RDD_total = RDD1.union(RDD2)..union(RDDn)

        2. for i = 0 to x:    RDD_total = RDD_total.map (some map
        function());

        3. return RDD_total.

        I that I should cache RDD total in order to optimize the
        iterations. Should I just be calling RDD_total.cache() at the
        end of each iteration ? or should I be preforming something
        more elaborate:


        RDD_temp = RDD_total.map (some map function());
        RDD_total.unpersist();
        RDD_total = RDD_temp.cache();



        Thanks,
        Yadid










Reply via email to