If you're not performing any action until the very end, then there is no
need to cache at all.
Observe (and don't make the mistake of thinking that my code represents
best practice and not just an illustration):
scala> val rdd0 = sc.parallelize(1 to 4, 2)
rdd0: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at
parallelize at <console>:12
scala> var rdd = rdd0
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at
parallelize at <console>:12
scala> def starsPlusOne(idx: Int, itr: Iterator[Int]) = {
| println("*" * (idx + 1))
| itr.map(_ + 1)
| }
starsPlusOne: (idx: Int, itr: Iterator[Int])Iterator[Int]
scala> rdd.mapPartitionsWithIndex(starsPlusOne, true)
res0: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at
mapPartitionsWithIndex at <console>:19
scala> rdd.collect
res1: Array[Int] = Array(1, 2, 3, 4)
scala> rdd = res0
rdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at
mapPartitionsWithIndex at <console>:19
scala> rdd.collect
*
**
res2: Array[Int] = Array(2, 3, 4, 5)
scala> rdd = rdd0
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at
parallelize at <console>:12
scala> rdd = rdd.mapPartitionsWithIndex(starsPlusOne, true)
rdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[2] at
mapPartitionsWithIndex at <console>:18
scala> rdd.collect
*
**
res3: Array[Int] = Array(2, 3, 4, 5)
scala> rdd = rdd.mapPartitionsWithIndex(starsPlusOne, true)
rdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[3] at
mapPartitionsWithIndex at <console>:18
scala> rdd.collect
*
*
**
**
res4: Array[Int] = Array(3, 4, 5, 6)
scala> rdd = rdd.mapPartitionsWithIndex(starsPlusOne, true)
rdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[4] at
mapPartitionsWithIndex at <console>:18
scala> rdd.collect
*
*
*
**
**
**
res5: Array[Int] = Array(4, 5, 6, 7)
scala> rdd = rdd0
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at
parallelize at <console>:12
scala> for (_ <- 1 to 3) { rdd = rdd.mapPartitionsWithIndex(starsPlusOne,
true) }
scala> rdd.collect
*
*
*
**
**
**
res7: Array[Int] = Array(4, 5, 6, 7)
scala> rdd = rdd0
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at
parallelize at <console>:12
scala> for (_ <- 1 to 3) {
| rdd = rdd.mapPartitionsWithIndex(starsPlusOne, true)
| rdd.cache()
| }
scala> rdd.collect
*
*
*
**
**
**
res9: Array[Int] = Array(4, 5, 6, 7)
scala> rdd = rdd0
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at
parallelize at <console>:12
scala> for (_ <- 1 to 3) {
| rdd = rdd.mapPartitionsWithIndex(starsPlusOne, true)
| rdd.cache()
| println(rdd.sum)
| }
*
**
14.0
*
**
18.0
*
**
22.0
scala> rdd.collect
res11: Array[Int] = Array(4, 5, 6, 7)
scala> rdd = rdd0
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at
parallelize at <console>:12
scala> var temp = rdd0
temp: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at
parallelize at <console>:12
scala> for (_ <- 1 to 3) {
| temp = rdd
| rdd = rdd.mapPartitionsWithIndex(starsPlusOne, true)
| rdd.cache()
| temp.unpersist()
| println(rdd.sum)
| }
*
**
14.0
*
*
**
**
18.0
*
*
*
**
**
**
22.0
scala> rdd.collect
res13: Array[Int] = Array(4, 5, 6, 7)
scala> rdd = rdd0
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at
parallelize at <console>:12
scala> for (_ <- 1 to 3) {
| temp = rdd
| rdd = rdd.mapPartitionsWithIndex(starsPlusOne, true)
| rdd.cache()
| println(rdd.sum)
| temp.unpersist()
| }
*
**
14.0
*
**
18.0
*
**
22.0
scala> rdd.collect
res14: Array[Int] = Array(4, 5, 6, 7)
Take-aways:
1. spark-shell is your friend
2. assigning an RDD to a var doesn't modify the underlying collection;
the var is more just a pointer into the specified lineage of
transformations for the underlying collection
3. cache() doesn't really do anything useful unless more than one action
is performed on the underlying collection
4. performing gratuitous actions within a loop just so that calling
cache() within the loop will have some immediate effect will usually not
change the total amount of work done outside the gratuitous job vs. just
doing one action after the looped transformations
5. trying to unpersist() prior cached iterations can work, but it is
sensitive to where it occurs relative to actions
On Sat, Nov 30, 2013 at 6:39 PM, Yadid Ayzenberg <[email protected]>wrote:
> step 4. would be count(), or collect(). The map() (in step 2.) would be
> performing calculations and writing information to a DB.
>
> Is this the information that was missing ?
>
> Thanks,
>
> Yadid
>
>
>
>
>
>
> On 11/30/13 9:24 PM, Mark Hamstra wrote:
>
> Your question doesn't really make any sense without specifying where any
> RDD actions take place (i.e. where Spark jobs are actually run.) Without
> any actions, all you've outlined so far are different ways to specify the
> chain of transformations that should be evaluated when an action is
> eventually called and a job runs. In a real sense your code hasn't
> actually done anything yet.
>
>
> On Sat, Nov 30, 2013 at 6:01 PM, Yadid Ayzenberg <[email protected]>wrote:
>
>>
>>
>>
>> Hi All,
>>
>> Im trying to implement the following and would like to know in which
>> places I should be calling RDD.cache():
>>
>> Suppose I have a group of RDDs : RDD1 to RDDn as input.
>>
>> 1. create a single RDD_total = RDD1.union(RDD2)..union(RDDn)
>>
>> 2. for i = 0 to x: RDD_total = RDD_total.map (some map function());
>>
>> 3. return RDD_total.
>>
>> I that I should cache RDD total in order to optimize the iterations.
>> Should I just be calling RDD_total.cache() at the end of each iteration ?
>> or should I be preforming something more elaborate:
>>
>>
>> RDD_temp = RDD_total.map (some map function());
>> RDD_total.unpersist();
>> RDD_total = RDD_temp.cache();
>>
>>
>>
>> Thanks,
>> Yadid
>>
>>
>>
>>
>>
>>
>>
>
>