Zhan,

This is exactly what I'm trying to do except, as I metnioned in my first
message, I am being given rdd1 and rdd2 only and I don't necessarily know
at that point whether or not rdd1 is a cached rdd. Further, I don't know at
that point whether or not rdd2 depends on rdd1.

On Thu, Feb 26, 2015 at 6:54 PM, Zhan Zhang <zzh...@hortonworks.com> wrote:

>  In this case, it is slow to wait for rdd1.saveAsHasoopFile(...)  to
> finish probably due to writing to hdfs.  a walk around for this particular
> case may be as follows.
>
>  val rdd1 = ......cache()
>
>  val rdd2 = rdd1.map().....()
> rdd1.count
>  future { rdd1.saveAsHasoopFile(...) }
>  future { rdd2.saveAsHadoopFile(…)]
>
>  In this way, rdd1 will be calculated once, and two saveAsHadoopFile will
> happen concurrently.
>
>  Thanks.
>
>  Zhan Zhang
>
>
>
>  On Feb 26, 2015, at 3:28 PM, Corey Nolet <cjno...@gmail.com> wrote:
>
>  > What confused me is  the statement of *"The final result is that rdd1
> is calculated twice.” *Is it the expected behavior?
>
>  To be perfectly honest, performing an action on a cached RDD in two
> different threads and having them (at the partition level) block until the
> parent are cached would be the behavior and myself and all my coworkers
> expected.
>
> On Thu, Feb 26, 2015 at 6:26 PM, Corey Nolet <cjno...@gmail.com> wrote:
>
>>  I should probably mention that my example case is much over simplified-
>> Let's say I've got a tree, a fairly complex one where I begin a series of
>> jobs at the root which calculates a bunch of really really complex joins
>> and as I move down the tree, I'm creating reports from the data that's
>> already been joined (i've implemented logic to determine when cached items
>> can be cleaned up, e.g. the last report has been done in a subtree).
>>
>>  My issue is that the 'actions' on the rdds are currently being
>> implemented in a single thread- even if I'm waiting on a cache to complete
>> fully before I run the "children" jobs, I'm still in a better placed than I
>> was because I'm able to run those jobs concurrently- right now this is not
>> the case.
>>
>> > What you want is for a request for partition X to wait if partition X
>> is already being calculated in a persisted RDD.
>>
>> I totally agree and if I could get it so that it's waiting at the
>> granularity of the partition, I'd be in a much much better place. I feel
>> like I'm going down a rabbit hole and working against the Spark API.
>>
>>
>> On Thu, Feb 26, 2015 at 6:03 PM, Sean Owen <so...@cloudera.com> wrote:
>>
>>> To distill this a bit further, I don't think you actually want rdd2 to
>>> wait on rdd1 in this case. What you want is for a request for
>>> partition X to wait if partition X is already being calculated in a
>>> persisted RDD. Otherwise the first partition of rdd2 waits on the
>>> final partition of rdd1 even when the rest is ready.
>>>
>>> That is probably usually a good idea in almost all cases. That much, I
>>> don't know how hard it is to implement. But I speculate that it's
>>> easier to deal with it at that level than as a function of the
>>> dependency graph.
>>>
>>> On Thu, Feb 26, 2015 at 10:49 PM, Corey Nolet <cjno...@gmail.com> wrote:
>>> > I'm trying to do the scheduling myself now- to determine that rdd2
>>> depends
>>> > on rdd1 and rdd1 is a persistent RDD (storage level != None) so that I
>>> can
>>> > do the no-op on rdd1 before I run rdd2. I would much rather the DAG
>>> figure
>>> > this out so I don't need to think about all this.
>>>
>>
>>
>
>

Reply via email to