Sorry, the edge between A and C is broadcast edge. On Wed, May 20, 2015 at 5:01 PM, Jeff Zhang <[email protected]> wrote:
> >- which means that join stage could process hash and wait for probe > making it a 3 stage DAG. However what you see is a 4 stage DAG, since join > will require shuffle on the ‘hash’. > > I guess the 3 stage DAG means the dag in spark, and 4 stage DAG means the > dag you build in tez. However, I think you can use 3 stage(vertex) dag in > tez as following. A process the hash data and B process the probe data. > Edge between A and C is one-to-one edge while edge between B and C is > scatter gather. C is a little tricky that it do both the reduce and join > as long as the reduce key and join key are the same. > > > A B > \ / > C > > > > On Wed, May 20, 2015 at 3:25 PM, Siddharth Seth <[email protected]> wrote: > >> I'm assuming you intend on having one vertex for 'joined'. This vertex >> processes 'hash' while waiting for data to come in from 'probe' (which is >> doing a shuffle / partition) ? >> The Processor on the 'joined' vertex can absolutely go ahead and process >> hash while the Shuffle from the other side is happening. It can notified >> when the Shuffle is complete via waitForInputReady() - if that's useful for >> this scenario. >> >> Data organization would probably make a difference though - and how hash >> is to be consumed by different partitions. >> >> >> On Tue, May 19, 2015 at 4:19 AM, Oleg Zhurakousky < >> [email protected]> wrote: >> >>> Thanks Sid >>> >>> Let’s take a classic join as an example, which contains hash and probe >>> inputs. It is currently assumed that both inputs will be DataSources or >>> Shuffles. This means that even in the cases where you would not need a >>> shuffle you now have to create one. >>> Let me describe it via Spark >>> >>> val hash = rdd.map(..) >>> >>> val probe = rdd.map(..).reduceByKey(..) >>> >>> val joined = hash.join(probe) >>> >>> >>> As you can see from the above, the computation of ‘hash’ does not >>> require shuffle, while ‘probe’ does, which means that join stage could >>> process hash and wait for probe making it a 3 stage DAG. However what you >>> see is a 4 stage DAG, since join will require shuffle on the ‘hash’. >>> Again, this is not about Spark, just using it to explain semantics. >>> >>> >>> Now as far as ProcessContext. I know I have a handle to it in my >>> processor. But by that time it is too late since processor is not created >>> until its inputs are available. That is why I was thinking that may be in >>> the DAG API there is some flag to set. Am i missing something? >>> >>> Thanks >>> Oleg >>> >>> >>> On May 19, 2015, at 2:10 AM, Siddharth Seth <[email protected]> wrote: >>> >>> These APIs are available during execution of the Processor. They're a >>> mechanism to get notified and wait till certain Inputs are ready, or get >>> notified on an Input being ready while another is being processed. There's >>> nothing on the DAG API for this. What are you looking to do ? One thing to >>> note though - the Inputs are not thread safe, and should be consumed from >>> the same thread or with external synchronization. >>> >>> On Mon, May 18, 2015 at 11:07 AM, Oleg Zhurakousky < >>> [email protected]> wrote: >>> >>>> Thanks Sid >>>> >>>> So, any pointer on how one would interact with it. I mean all I do is >>>> assemble DAG and I can’t seem to see anything on the Vertex that would >>>> allow me to do that. >>>> >>>> Thanks >>>> Oleg >>>> >>>> >>>> >>>> >>>> On May 18, 2015, at 2:00 PM, Siddharth Seth <[email protected]> wrote: >>>> >>>> There's APIs on the ProcessorContext - waitForAllInputsReady, >>>> waitForAnyInputReady - which can be used to figure out when a specific >>>> Input is ready for consumption. That should solve the first question. >>>> >>>> Regarding vertices with multiple Inputs and Shuffle - that requires a >>>> custom VertexManager plugin to figure out how the splits are to be >>>> distributed to the various tasks. Also have to make sure that the number of >>>> tasks is setup correctly - likely according to the Shuffle edge. >>>> >>>> On Mon, May 18, 2015 at 9:00 AM, Oleg Zhurakousky < >>>> [email protected]> wrote: >>>> >>>>> Also, while trying something related to this i’ve noticed the >>>>> following: "A vertex with an Initial Input and a Shuffle Input are not >>>>> supported at the moment”. >>>>> Is there a target timeframe for this? JIRA? >>>>> >>>>> Thanks >>>>> Oleg >>>>> >>>>> > On May 18, 2015, at 10:27 AM, Oleg Zhurakousky < >>>>> [email protected]> wrote: >>>>> > >>>>> > Is it possible to allow Tez processor implementation which has >>>>> multiple inputs to become available as soon as at least one input is >>>>> available to be read. >>>>> > This could allow for some computation to begin while waiting for >>>>> other inputs. Other inputs could (if logic allows) be processed as they >>>>> become available. >>>>> > >>>>> > >>>>> > Thanks >>>>> > Oleg >>>>> >>>>> >>>> >>>> >>> >>> >> > > > -- > Best Regards > > Jeff Zhang > -- Best Regards Jeff Zhang
