Thanks Mark, that makes perfect sense.
I guess I still don't have a full picture in my head when in comes to
the caching:
How is the RDD cache managed (assuming not enough memory for all the
cached RDDs): is it LRU or LFU, or something else ?
Thanks,
Yadid
On 11/30/13 10:56 PM, Mark Hamstra wrote:
If you're not performing any action until the very end, then there is
no need to cache at all.
Observe (and don't make the mistake of thinking that my code
represents best practice and not just an illustration):
scala> val rdd0 = sc.parallelize(1 to 4, 2)
rdd0: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at
parallelize at <console>:12
scala> var rdd = rdd0
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at
parallelize at <console>:12
scala> def starsPlusOne(idx: Int, itr: Iterator[Int]) = {
| println("*" * (idx + 1))
| itr.map(_ + 1)
| }
starsPlusOne: (idx: Int, itr: Iterator[Int])Iterator[Int]
scala> rdd.mapPartitionsWithIndex(starsPlusOne, true)
res0: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at
mapPartitionsWithIndex at <console>:19
scala> rdd.collect
res1: Array[Int] = Array(1, 2, 3, 4)
scala> rdd = res0
rdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at
mapPartitionsWithIndex at <console>:19
scala> rdd.collect
*
**
res2: Array[Int] = Array(2, 3, 4, 5)
scala> rdd = rdd0
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at
parallelize at <console>:12
scala> rdd = rdd.mapPartitionsWithIndex(starsPlusOne, true)
rdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[2] at
mapPartitionsWithIndex at <console>:18
scala> rdd.collect
*
**
res3: Array[Int] = Array(2, 3, 4, 5)
scala> rdd = rdd.mapPartitionsWithIndex(starsPlusOne, true)
rdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[3] at
mapPartitionsWithIndex at <console>:18
scala> rdd.collect
*
*
**
**
res4: Array[Int] = Array(3, 4, 5, 6)
scala> rdd = rdd.mapPartitionsWithIndex(starsPlusOne, true)
rdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[4] at
mapPartitionsWithIndex at <console>:18
scala> rdd.collect
*
*
*
**
**
**
res5: Array[Int] = Array(4, 5, 6, 7)
scala> rdd = rdd0
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at
parallelize at <console>:12
scala> for (_ <- 1 to 3) { rdd =
rdd.mapPartitionsWithIndex(starsPlusOne, true) }
scala> rdd.collect
*
*
*
**
**
**
res7: Array[Int] = Array(4, 5, 6, 7)
scala> rdd = rdd0
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at
parallelize at <console>:12
scala> for (_ <- 1 to 3) {
| rdd = rdd.mapPartitionsWithIndex(starsPlusOne, true)
| rdd.cache()
| }
scala> rdd.collect
*
*
*
**
**
**
res9: Array[Int] = Array(4, 5, 6, 7)
scala> rdd = rdd0
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at
parallelize at <console>:12
scala> for (_ <- 1 to 3) {
| rdd = rdd.mapPartitionsWithIndex(starsPlusOne, true)
| rdd.cache()
| println(rdd.sum)
| }
*
**
14.0
*
**
18.0
*
**
22.0
scala> rdd.collect
res11: Array[Int] = Array(4, 5, 6, 7)
scala> rdd = rdd0
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at
parallelize at <console>:12
scala> var temp = rdd0
temp: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at
parallelize at <console>:12
scala> for (_ <- 1 to 3) {
| temp = rdd
| rdd = rdd.mapPartitionsWithIndex(starsPlusOne, true)
| rdd.cache()
| temp.unpersist()
| println(rdd.sum)
| }
*
**
14.0
*
*
**
**
18.0
*
*
*
**
**
**
22.0
scala> rdd.collect
res13: Array[Int] = Array(4, 5, 6, 7)
scala> rdd = rdd0
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at
parallelize at <console>:12
scala> for (_ <- 1 to 3) {
| temp = rdd
| rdd = rdd.mapPartitionsWithIndex(starsPlusOne, true)
| rdd.cache()
| println(rdd.sum)
| temp.unpersist()
| }
*
**
14.0
*
**
18.0
*
**
22.0
scala> rdd.collect
res14: Array[Int] = Array(4, 5, 6, 7)
Take-aways:
1. spark-shell is your friend
2. assigning an RDD to a var doesn't modify the underlying
collection; the var is more just a pointer into the specified
lineage of transformations for the underlying collection
3. cache() doesn't really do anything useful unless more than one
action is performed on the underlying collection
4. performing gratuitous actions within a loop just so that calling
cache() within the loop will have some immediate effect will
usually not change the total amount of work done outside the
gratuitous job vs. just doing one action after the looped
transformations
5. trying to unpersist() prior cached iterations can work, but it is
sensitive to where it occurs relative to actions
On Sat, Nov 30, 2013 at 6:39 PM, Yadid Ayzenberg <[email protected]
<mailto:[email protected]>> wrote:
step 4. would be count(), or collect(). The map() (in step 2.)
would be performing calculations and writing information to a DB.
Is this the information that was missing ?
Thanks,
Yadid
On 11/30/13 9:24 PM, Mark Hamstra wrote:
Your question doesn't really make any sense without specifying
where any RDD actions take place (i.e. where Spark jobs are
actually run.) Without any actions, all you've outlined so far
are different ways to specify the chain of transformations that
should be evaluated when an action is eventually called and a job
runs. In a real sense your code hasn't actually done anything yet.
On Sat, Nov 30, 2013 at 6:01 PM, Yadid Ayzenberg
<[email protected] <mailto:[email protected]>> wrote:
Hi All,
Im trying to implement the following and would like to know
in which places I should be calling RDD.cache():
Suppose I have a group of RDDs : RDD1 to RDDn as input.
1. create a single RDD_total = RDD1.union(RDD2)..union(RDDn)
2. for i = 0 to x: RDD_total = RDD_total.map (some map
function());
3. return RDD_total.
I that I should cache RDD total in order to optimize the
iterations. Should I just be calling RDD_total.cache() at the
end of each iteration ? or should I be preforming something
more elaborate:
RDD_temp = RDD_total.map (some map function());
RDD_total.unpersist();
RDD_total = RDD_temp.cache();
Thanks,
Yadid