Hi Nathan,

I think there are two possible reasons for this. One is that even though you 
are caching RDDs, their lineage chain gets longer and longer, and thus 
serializing each RDD takes more time. You can cut off the chain by using 
RDD.checkpoint() periodically, say every 5-10 iterations. The second reason may 
just be garbage accumulating in the JVM and causing more collection time as you 
go ahead.

Matei

On Jul 11, 2014, at 6:54 AM, Nathan Kronenfeld <nkronenf...@oculusinfo.com> 
wrote:

> Hi, folks.
> 
> We're having a problem with iteration that I don't understand.
> 
> We have the following test code:
> 
> org.apache.log4j.Logger.getLogger("org").setLevel(org.apache.log4j.Level.WARN)
> org.apache.log4j.Logger.getLogger("akka").setLevel(org.apache.log4j.Level.WARN)
> 
> def test (caching: Boolean, points: Int, iterations: Int) {
>       var coords = sc.parallelize(Array.fill(points)(0.0, 
> 0.0).zipWithIndex.map(_.swap))
>       if (caching) coords.cache
>       coords.count
> 
>       var iteration = 0
>       val times = new Array[Double](iterations)
> 
>       do {
>               val start = System.currentTimeMillis
>               val thisIteration = iteration
>               val increments = sc.parallelize(for (i <- 1 to points) yield 
> (math.random, math.random))
>               val newcoords = coords.zip(increments).map(p =>
>                       {
>                               if (0 == p._1._1) println("Processing iteration 
> "+thisIteration)
>                               (p._1._1,
>                                (p._1._2._1 + p._2._1,
>                                 p._1._2._2 + p._2._2))
>                       }
>               )
>               if (caching) newcoords.cache
>               newcoords.count
>               if (caching) coords.unpersist(false)
>               coords = newcoords
>               val end = System.currentTimeMillis
> 
>               times(iteration) = (end-start)/1000.0
>               println("Done iteration "+iteration+" in "+times(iteration)+" 
> seconds")
>               iteration = iteration + 1
>       } while (iteration < iterations)
> 
>       for (i <- 0 until iterations) {
>               println("Iteration "+i+": "+times(i))
>       }
> }
> 
> If you run this on a local server with caching on and off, it appears that 
> the caching does what it is supposed to do - only the latest iteration is 
> processed each time through the loop.
> 
> However, despite this, the time for each iteration still gets slower and 
> slower.
> For example, calling test(true, 5000, 100), I get the following times 
> (weeding out a few for brevity):
> Iteration 0: 0.084
> Iteration 10: 0.381
> Iteration 20: 0.674
> Iteration 30: 0.975
> Iteration 40: 1.254
> Iteration 50: 1.544
> Iteration 60: 1.802
> Iteration 70: 2.147
> Iteration 80: 2.469
> Iteration 90: 2.715
> Iteration 99: 2.962
> 
> That's a 35x increase between the first and last iteration, when it should be 
> doing the same thing each time!
> 
> Without caching, the nubmers are
> Iteration 0: 0.642
> Iteration 10: 0.516
> Iteration 20: 0.823
> Iteration 30: 1.17
> Iteration 40: 1.514
> Iteration 50: 1.655
> Iteration 60: 1.992
> Iteration 70: 2.177
> Iteration 80: 2.472
> Iteration 90: 2.814
> Iteration 99: 3.018
> 
> slightly slower - but not significantly.
> 
> Does anyone know, if the caching is working, why is iteration 100 slower than 
> iteration 1?  And why is caching making so little difference?
> 
> 
> Thanks,
>             -Nathan Kronenfeld
> 
> -- 
> Nathan Kronenfeld
> Senior Visualization Developer
> Oculus Info Inc
> 2 Berkeley Street, Suite 600,
> Toronto, Ontario M5A 4J5
> Phone:  +1-416-203-3003 x 238
> Email:  nkronenf...@oculusinfo.com

Reply via email to