>- which means that join stage could process hash and wait for probe making
it a 3 stage DAG. However what you see is a 4 stage DAG, since join will
require shuffle on the ‘hash’.

I guess the 3 stage DAG means the dag in spark, and 4 stage DAG means the
dag you build in tez. However, I think you can use 3 stage(vertex) dag in
tez as following. A process the hash data and B process the probe data.
Edge between A and C is one-to-one edge while edge between B and C is
scatter gather.   C is a little tricky that it do both the reduce and join
as long as the reduce key and join key are the same.


A     B
  \   /
   C



On Wed, May 20, 2015 at 3:25 PM, Siddharth Seth <[email protected]> wrote:

> I'm assuming you intend on having one vertex for 'joined'. This vertex
> processes 'hash' while waiting for data to come in from 'probe' (which is
> doing a shuffle / partition) ?
> The Processor on the 'joined' vertex can absolutely go ahead and process
> hash while the Shuffle from the other side is happening. It can notified
> when the Shuffle is complete via waitForInputReady() - if that's useful for
> this scenario.
>
> Data organization would probably make a difference though - and how hash
> is to be consumed by different partitions.
>
>
> On Tue, May 19, 2015 at 4:19 AM, Oleg Zhurakousky <
> [email protected]> wrote:
>
>>  Thanks Sid
>>
>>  Let’s take a classic join as an example, which contains hash and probe
>> inputs. It is currently assumed that both inputs will be DataSources or
>> Shuffles. This means that even in the cases where you would not need a
>> shuffle you now have to create one.
>> Let me describe it via Spark
>>
>>  val hash = rdd.map(..)
>>
>>  val probe = rdd.map(..).reduceByKey(..)
>>
>>  val joined = hash.join(probe)
>>
>>
>>  As you can see from the above, the computation of ‘hash’ does not
>> require shuffle, while ‘probe’ does, which means that join stage could
>> process hash and wait for probe making it a 3 stage DAG. However what you
>> see is a 4 stage DAG, since join will require shuffle on the ‘hash’.
>> Again, this is not about Spark, just using it to explain semantics.
>>
>>
>>  Now as far as ProcessContext. I know I have a handle to it in my
>> processor. But by that time it is too late since processor is not created
>> until its inputs are available. That is why I was thinking that may be in
>> the DAG API there is some flag to set. Am i missing something?
>>
>>  Thanks
>> Oleg
>>
>>
>>  On May 19, 2015, at 2:10 AM, Siddharth Seth <[email protected]> wrote:
>>
>>  These APIs are available during execution of the Processor. They're a
>> mechanism to get notified and wait till certain Inputs are ready, or get
>> notified on an Input being ready while another is being processed. There's
>> nothing on the DAG API for this. What are you looking to do ? One thing to
>> note though - the Inputs are not thread safe, and should be consumed from
>> the same thread or with external synchronization.
>>
>> On Mon, May 18, 2015 at 11:07 AM, Oleg Zhurakousky <
>> [email protected]> wrote:
>>
>>> Thanks Sid
>>>
>>>  So, any pointer on how one would interact with it. I mean all I do is
>>> assemble DAG and I can’t seem to see anything on the Vertex that would
>>> allow me to do that.
>>>
>>>  Thanks
>>>  Oleg
>>>
>>>
>>>
>>>
>>>  On May 18, 2015, at 2:00 PM, Siddharth Seth <[email protected]> wrote:
>>>
>>>  There's APIs on the ProcessorContext - waitForAllInputsReady,
>>> waitForAnyInputReady - which can be used to figure out when a specific
>>> Input is ready for consumption. That should solve the first question.
>>>
>>>  Regarding vertices with multiple Inputs and Shuffle - that requires a
>>> custom VertexManager plugin to figure out how the splits are to be
>>> distributed to the various tasks. Also have to make sure that the number of
>>> tasks is setup correctly - likely according to the Shuffle edge.
>>>
>>> On Mon, May 18, 2015 at 9:00 AM, Oleg Zhurakousky <
>>> [email protected]> wrote:
>>>
>>>> Also, while trying something related to this i’ve noticed the
>>>> following: "A vertex with an Initial Input and a Shuffle Input are not
>>>> supported at the moment”.
>>>> Is there a target timeframe for this? JIRA?
>>>>
>>>> Thanks
>>>> Oleg
>>>>
>>>> > On May 18, 2015, at 10:27 AM, Oleg Zhurakousky <
>>>> [email protected]> wrote:
>>>> >
>>>> > Is it possible to allow Tez processor implementation which has
>>>> multiple inputs to become available as soon as at least one input is
>>>> available to be read.
>>>> > This could allow for some computation to begin while waiting for
>>>> other inputs. Other inputs could (if logic allows) be processed as they
>>>> become available.
>>>> >
>>>> >
>>>> > Thanks
>>>> > Oleg
>>>>
>>>>
>>>
>>>
>>
>>
>


-- 
Best Regards

Jeff Zhang

Reply via email to