Zhan, This is exactly what I'm trying to do except, as I metnioned in my first message, I am being given rdd1 and rdd2 only and I don't necessarily know at that point whether or not rdd1 is a cached rdd. Further, I don't know at that point whether or not rdd2 depends on rdd1.
On Thu, Feb 26, 2015 at 6:54 PM, Zhan Zhang <zzh...@hortonworks.com> wrote: > In this case, it is slow to wait for rdd1.saveAsHasoopFile(...) to > finish probably due to writing to hdfs. a walk around for this particular > case may be as follows. > > val rdd1 = ......cache() > > val rdd2 = rdd1.map().....() > rdd1.count > future { rdd1.saveAsHasoopFile(...) } > future { rdd2.saveAsHadoopFile(…)] > > In this way, rdd1 will be calculated once, and two saveAsHadoopFile will > happen concurrently. > > Thanks. > > Zhan Zhang > > > > On Feb 26, 2015, at 3:28 PM, Corey Nolet <cjno...@gmail.com> wrote: > > > What confused me is the statement of *"The final result is that rdd1 > is calculated twice.” *Is it the expected behavior? > > To be perfectly honest, performing an action on a cached RDD in two > different threads and having them (at the partition level) block until the > parent are cached would be the behavior and myself and all my coworkers > expected. > > On Thu, Feb 26, 2015 at 6:26 PM, Corey Nolet <cjno...@gmail.com> wrote: > >> I should probably mention that my example case is much over simplified- >> Let's say I've got a tree, a fairly complex one where I begin a series of >> jobs at the root which calculates a bunch of really really complex joins >> and as I move down the tree, I'm creating reports from the data that's >> already been joined (i've implemented logic to determine when cached items >> can be cleaned up, e.g. the last report has been done in a subtree). >> >> My issue is that the 'actions' on the rdds are currently being >> implemented in a single thread- even if I'm waiting on a cache to complete >> fully before I run the "children" jobs, I'm still in a better placed than I >> was because I'm able to run those jobs concurrently- right now this is not >> the case. >> >> > What you want is for a request for partition X to wait if partition X >> is already being calculated in a persisted RDD. >> >> I totally agree and if I could get it so that it's waiting at the >> granularity of the partition, I'd be in a much much better place. I feel >> like I'm going down a rabbit hole and working against the Spark API. >> >> >> On Thu, Feb 26, 2015 at 6:03 PM, Sean Owen <so...@cloudera.com> wrote: >> >>> To distill this a bit further, I don't think you actually want rdd2 to >>> wait on rdd1 in this case. What you want is for a request for >>> partition X to wait if partition X is already being calculated in a >>> persisted RDD. Otherwise the first partition of rdd2 waits on the >>> final partition of rdd1 even when the rest is ready. >>> >>> That is probably usually a good idea in almost all cases. That much, I >>> don't know how hard it is to implement. But I speculate that it's >>> easier to deal with it at that level than as a function of the >>> dependency graph. >>> >>> On Thu, Feb 26, 2015 at 10:49 PM, Corey Nolet <cjno...@gmail.com> wrote: >>> > I'm trying to do the scheduling myself now- to determine that rdd2 >>> depends >>> > on rdd1 and rdd1 is a persistent RDD (storage level != None) so that I >>> can >>> > do the no-op on rdd1 before I run rdd2. I would much rather the DAG >>> figure >>> > this out so I don't need to think about all this. >>> >> >> > >