Re: Advantage of using cache()

2014-08-23 Thread Patrick Wendell
Yep - that's correct. As an optimization we save the shuffle output and
re-use if if you execute a stage twice. So this can make A:B tests like
this a bit confusing.

- Patrick

On Friday, August 22, 2014, Nieyuan  wrote:

> Because map-reduce tasks like join will save shuffle data to disk . So the
> only diffrence with caching or no-caching version is :
>   >> .map { case (x, (n, i)) => (x, n)}
>
>
>
> -
> Thanks,
> Nieyuan
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Advantage-of-using-cache-tp12480p12634.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
> For additional commands, e-mail: user-h...@spark.apache.org 
>
>


Re: Advantage of using cache()

2014-08-22 Thread Nieyuan
Because map-reduce tasks like join will save shuffle data to disk . So the
only diffrence with caching or no-caching version is :
  >> .map { case (x, (n, i)) => (x, n)}



-
Thanks,
Nieyuan
--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Advantage-of-using-cache-tp12480p12634.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Advantage of using cache()

2014-08-21 Thread Grzegorz Białek
Hi,
thank you for your response. I removed issues you mentioned. Now I read
RDDs from files, whole rdd is cached, I don't use random and rdd1 and rdd2
are identical.
RDDs that are joined contains 100k entries and result contains 10m entries.
rdd1 and rdd2 after join also contains 10m entries. Here is the code:

  val help = sc.textFile("input/tab1")
.map{ s => val pair = s.split("\t"); (pair(0).toInt, pair(1).toInt)}
.repartition(100)
  val rdd = sc.textFile("input/tab2")
.map{ s => val pair = s.split("\t"); (pair(0).toInt, pair(1).toInt)}
.repartition(100)
.join(help)
.map { case (x, (n, i)) => (x, n)}
.cache() // or without it

  val rdd1 = sc.parallelize(Array.range(0, 1000)).map(x => (x, x))
.join(rdd).saveAsTextFile("output/1")
  val rdd2 = sc.parallelize(Array.range(0, 1000)).map(x => (x, x))
.join(rdd).saveAsTextFile("output/2")

Files input/tab1, input/tab2 were generated using this python code:
for x in range(10):
file1.write("%d\t%d\n" % (random.randint(0, 1000), x))
file2.write("%d\t%d\n" % (random.randint(0, 1000), x))

When using cache whole rdd is cached. It's size is 362MB.
Results are similar:
Without cache: stages used to compute rdd: 30s, rdd1: 16s, rdd2: 12s
With cache: stages used to compute rdd: 28s, rdd1: 14s, rdd2: 15s

I thought that without caching rdd, computing rdd2 will be much longer due
to recomputing of rdd. But it seems that it doesn't work that way. Could
you explain it or point to an example which shows power of caching?

Thanks,
Grzegorz


On Wed, Aug 20, 2014 at 11:22 PM, Patrick Wendell 
wrote:

> Your rdd2 and rdd3 differ in two ways so it's hard to track the exact
> effect of caching. In rdd3, in addition to the fact that rdd will be
> cached, you are also doing a bunch of extra random number generation. So it
> will be hard to isolate the effect of caching.
>
>
> On Wed, Aug 20, 2014 at 7:48 AM, Grzegorz Białek <
> grzegorz.bia...@codilime.com> wrote:
>
>> Hi,
>>
>> I tried to write small program which shows that using cache() can speed
>> up execution but results with and without cache were similar. Could help me
>> with this issue? I tried to compute rdd and use it later in two places and
>> I thought in second usage this rdd is recomputed but it doesn't:
>>
>>   val help = sc.parallelize(Array.range(1, 2)).repartition(100)
>> .map(x => (scala.util.Random.nextInt(10), x))
>>   val rdd = sc.parallelize(Array.range(1,2))
>>  .repartition(100)
>> .map(x => (scala.util.Random.nextInt(10), x))
>> .join(help)
>> .map { case (x, (n, i)) => (x, n)}
>> .reduceByKey(_ + _)
>> .cache()
>>
>>   val rdd2 = sc.parallelize(Array.range(1,1000)).map(x => (x, x))
>> .join(rdd).saveAsTextFile("output/1")
>>   val rdd3 = sc.parallelize(Array.range(1,1000)).map(x =>
>> (scala.util.Random.nextInt(1000), x))
>> .join(rdd).saveAsTextFile("output/2")
>>
>> Thanks,
>> Grzegorz
>>
>
>


Re: Advantage of using cache()

2014-08-20 Thread Patrick Wendell
Your rdd2 and rdd3 differ in two ways so it's hard to track the exact
effect of caching. In rdd3, in addition to the fact that rdd will be
cached, you are also doing a bunch of extra random number generation. So it
will be hard to isolate the effect of caching.


On Wed, Aug 20, 2014 at 7:48 AM, Grzegorz Białek <
grzegorz.bia...@codilime.com> wrote:

> Hi,
>
> I tried to write small program which shows that using cache() can speed up
> execution but results with and without cache were similar. Could help me
> with this issue? I tried to compute rdd and use it later in two places and
> I thought in second usage this rdd is recomputed but it doesn't:
>
>   val help = sc.parallelize(Array.range(1, 2)).repartition(100)
> .map(x => (scala.util.Random.nextInt(10), x))
>   val rdd = sc.parallelize(Array.range(1,2))
> .repartition(100)
> .map(x => (scala.util.Random.nextInt(10), x))
> .join(help)
> .map { case (x, (n, i)) => (x, n)}
> .reduceByKey(_ + _)
> .cache()
>
>   val rdd2 = sc.parallelize(Array.range(1,1000)).map(x => (x, x))
> .join(rdd).saveAsTextFile("output/1")
>   val rdd3 = sc.parallelize(Array.range(1,1000)).map(x =>
> (scala.util.Random.nextInt(1000), x))
> .join(rdd).saveAsTextFile("output/2")
>
> Thanks,
> Grzegorz
>


Advantage of using cache()

2014-08-20 Thread Grzegorz Białek
Hi,

I tried to write small program which shows that using cache() can speed up
execution but results with and without cache were similar. Could help me
with this issue? I tried to compute rdd and use it later in two places and
I thought in second usage this rdd is recomputed but it doesn't:

  val help = sc.parallelize(Array.range(1, 2)).repartition(100)
.map(x => (scala.util.Random.nextInt(10), x))
  val rdd = sc.parallelize(Array.range(1,2))
.repartition(100)
.map(x => (scala.util.Random.nextInt(10), x))
.join(help)
.map { case (x, (n, i)) => (x, n)}
.reduceByKey(_ + _)
.cache()

  val rdd2 = sc.parallelize(Array.range(1,1000)).map(x => (x, x))
.join(rdd).saveAsTextFile("output/1")
  val rdd3 = sc.parallelize(Array.range(1,1000)).map(x =>
(scala.util.Random.nextInt(1000), x))
.join(rdd).saveAsTextFile("output/2")

Thanks,
Grzegorz