In Pig, we implement this by doing 3 vertices.  Vertex1 (Load with
Combiner), Vertex2 (Load with Combiner)  -> Vertex3 (Group by). Vertex1 and
Vertex2 are made part of a VertexGroup (logical abstraction and not a real
vertex), so that their output is seen as one single output by Vertex 3.
This approach also works well if Vertex1 and Vertex2 were intermediate
vertices and not root vertices with MRInput.

https://github.com/apache/pig/blob/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-2.gld
(Plan using VertexGroup and 3 vertices)
https://github.com/apache/pig/blob/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-2-OPTOFF.gld
 (This is the unoptimized plan with 4 vertices which is similar to your
current cascading plan)

On Tue, Feb 14, 2017 at 3:20 PM, Piyush Narang <pnar...@twitter.com> wrote:

> Thanks for getting back Rohini and Siddharth. To provide some context, we
> have two input vertices each reading lzo thrift data from a different path
> on hdfs. We then merge
> <http://docs.cascading.org/cascading/2.0/javadoc/cascading/pipe/Merge.html> 
> the
> data from the two vertices and then groupBy and some aggregations one of
> the fields. In MR, the reading from the 2 inputs and the merge happens on
> the mappers and the group + aggregations on the reducers. In case of Tez we
> have the merge on a different vertex and the group + aggregations on a
> different vertex (with Cascading choosing scatter gather edges in both
> cases). Exploring if it would be possible to combine the merge with the
> groupBy in Cascading. I was wondering if the MultiMRInput would have been
> an option in cases where we read from 2 or more sources and follow that up
> with a merge. That might be an option to explore if we're not able to
> collapse the merge and groupBy.
>
> On Tue, Feb 14, 2017 at 9:14 AM, Siddharth Seth <ss...@apache.org> wrote:
>
>> What operations are being performed by these vertices? If there's no
>> advantage of reading multiple sources in a single task - using separate
>> vertices is preferable. At least for Hive, when it read multiple sources in
>> the same vertex, it had to perform some tagging etc for the reduce side to
>> differentiate the inputs.
>> MultiMRInput can be used for public consumption. Like Rohini mentioned,
>> it is used for SMB joins in Hive. IIRC, hive ends up setting this up to
>> read multiple buckets within the same vertex/task.
>> Also - it is possible to hook multiple MRInputs into a single vertex.
>> That will require a custom vertex manager to figure out the parallelism,
>> and how splits from these sources are to be combined. Hive does this for
>> SMB joins, where it'll send a single bucket / groups of buckets from
>> different sources to the same task. (Both sides ordered, and bucketed - so
>> it's possible to do a merge join in this vertex).
>>
>>
>> On Mon, Feb 13, 2017 at 5:37 PM, Piyush Narang <pnar...@twitter.com>
>> wrote:
>>
>>> hi folks,
>>>
>>> While debugging the DAG generated by a Scalding / Cascading job, I
>>> noticed that in Tez we end up with two input vertices - one vertex for each
>>> input path. In case of Hadoop on the other hand we end up with our map
>>> phase reading from both input datasets. Is this supported in Tez? I noticed
>>> that Cascading is currently using MRInput
>>> <https://github.com/apache/tez/blob/master/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java>
>>>  to
>>> set up its Tez inputs. I wasn't sure if we could use MultiMRInput
>>> <https://github.com/apache/tez/blob/master/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java>
>>>  to
>>> read from multiple input directories in the same vertex in Tez or if it has
>>> a different purpose. If we can use it, is it safe for public consumption?
>>> (noticed it is still annotated with @Evolving).
>>>
>>> Thanks,
>>>
>>> --
>>> - Piyush
>>>
>>
>>
>
>
> --
> - Piyush
>

Reply via email to