Re: Advantage of using cache()
Yep - that's correct. As an optimization we save the shuffle output and re-use if if you execute a stage twice. So this can make A:B tests like this a bit confusing. - Patrick On Friday, August 22, 2014, Nieyuan wrote: > Because map-reduce tasks like join will save shuffle data to disk . So the > only diffrence with caching or no-caching version is : > >> .map { case (x, (n, i)) => (x, n)} > > > > - > Thanks, > Nieyuan > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Advantage-of-using-cache-tp12480p12634.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >
Re: Advantage of using cache()
Because map-reduce tasks like join will save shuffle data to disk . So the only diffrence with caching or no-caching version is : >> .map { case (x, (n, i)) => (x, n)} - Thanks, Nieyuan -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Advantage-of-using-cache-tp12480p12634.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Advantage of using cache()
Hi, thank you for your response. I removed issues you mentioned. Now I read RDDs from files, whole rdd is cached, I don't use random and rdd1 and rdd2 are identical. RDDs that are joined contains 100k entries and result contains 10m entries. rdd1 and rdd2 after join also contains 10m entries. Here is the code: val help = sc.textFile("input/tab1") .map{ s => val pair = s.split("\t"); (pair(0).toInt, pair(1).toInt)} .repartition(100) val rdd = sc.textFile("input/tab2") .map{ s => val pair = s.split("\t"); (pair(0).toInt, pair(1).toInt)} .repartition(100) .join(help) .map { case (x, (n, i)) => (x, n)} .cache() // or without it val rdd1 = sc.parallelize(Array.range(0, 1000)).map(x => (x, x)) .join(rdd).saveAsTextFile("output/1") val rdd2 = sc.parallelize(Array.range(0, 1000)).map(x => (x, x)) .join(rdd).saveAsTextFile("output/2") Files input/tab1, input/tab2 were generated using this python code: for x in range(10): file1.write("%d\t%d\n" % (random.randint(0, 1000), x)) file2.write("%d\t%d\n" % (random.randint(0, 1000), x)) When using cache whole rdd is cached. It's size is 362MB. Results are similar: Without cache: stages used to compute rdd: 30s, rdd1: 16s, rdd2: 12s With cache: stages used to compute rdd: 28s, rdd1: 14s, rdd2: 15s I thought that without caching rdd, computing rdd2 will be much longer due to recomputing of rdd. But it seems that it doesn't work that way. Could you explain it or point to an example which shows power of caching? Thanks, Grzegorz On Wed, Aug 20, 2014 at 11:22 PM, Patrick Wendell wrote: > Your rdd2 and rdd3 differ in two ways so it's hard to track the exact > effect of caching. In rdd3, in addition to the fact that rdd will be > cached, you are also doing a bunch of extra random number generation. So it > will be hard to isolate the effect of caching. > > > On Wed, Aug 20, 2014 at 7:48 AM, Grzegorz Białek < > grzegorz.bia...@codilime.com> wrote: > >> Hi, >> >> I tried to write small program which shows that using cache() can speed >> up execution but results with and without cache were similar. Could help me >> with this issue? I tried to compute rdd and use it later in two places and >> I thought in second usage this rdd is recomputed but it doesn't: >> >> val help = sc.parallelize(Array.range(1, 2)).repartition(100) >> .map(x => (scala.util.Random.nextInt(10), x)) >> val rdd = sc.parallelize(Array.range(1,2)) >> .repartition(100) >> .map(x => (scala.util.Random.nextInt(10), x)) >> .join(help) >> .map { case (x, (n, i)) => (x, n)} >> .reduceByKey(_ + _) >> .cache() >> >> val rdd2 = sc.parallelize(Array.range(1,1000)).map(x => (x, x)) >> .join(rdd).saveAsTextFile("output/1") >> val rdd3 = sc.parallelize(Array.range(1,1000)).map(x => >> (scala.util.Random.nextInt(1000), x)) >> .join(rdd).saveAsTextFile("output/2") >> >> Thanks, >> Grzegorz >> > >
Re: Advantage of using cache()
Your rdd2 and rdd3 differ in two ways so it's hard to track the exact effect of caching. In rdd3, in addition to the fact that rdd will be cached, you are also doing a bunch of extra random number generation. So it will be hard to isolate the effect of caching. On Wed, Aug 20, 2014 at 7:48 AM, Grzegorz Białek < grzegorz.bia...@codilime.com> wrote: > Hi, > > I tried to write small program which shows that using cache() can speed up > execution but results with and without cache were similar. Could help me > with this issue? I tried to compute rdd and use it later in two places and > I thought in second usage this rdd is recomputed but it doesn't: > > val help = sc.parallelize(Array.range(1, 2)).repartition(100) > .map(x => (scala.util.Random.nextInt(10), x)) > val rdd = sc.parallelize(Array.range(1,2)) > .repartition(100) > .map(x => (scala.util.Random.nextInt(10), x)) > .join(help) > .map { case (x, (n, i)) => (x, n)} > .reduceByKey(_ + _) > .cache() > > val rdd2 = sc.parallelize(Array.range(1,1000)).map(x => (x, x)) > .join(rdd).saveAsTextFile("output/1") > val rdd3 = sc.parallelize(Array.range(1,1000)).map(x => > (scala.util.Random.nextInt(1000), x)) > .join(rdd).saveAsTextFile("output/2") > > Thanks, > Grzegorz >
Advantage of using cache()
Hi, I tried to write small program which shows that using cache() can speed up execution but results with and without cache were similar. Could help me with this issue? I tried to compute rdd and use it later in two places and I thought in second usage this rdd is recomputed but it doesn't: val help = sc.parallelize(Array.range(1, 2)).repartition(100) .map(x => (scala.util.Random.nextInt(10), x)) val rdd = sc.parallelize(Array.range(1,2)) .repartition(100) .map(x => (scala.util.Random.nextInt(10), x)) .join(help) .map { case (x, (n, i)) => (x, n)} .reduceByKey(_ + _) .cache() val rdd2 = sc.parallelize(Array.range(1,1000)).map(x => (x, x)) .join(rdd).saveAsTextFile("output/1") val rdd3 = sc.parallelize(Array.range(1,1000)).map(x => (scala.util.Random.nextInt(1000), x)) .join(rdd).saveAsTextFile("output/2") Thanks, Grzegorz