Like Rohini said, this could be done with 2 vertices reading each input in parallel (so effectively the same as one larger vertex reading both inputs from parallelism POV). Then the merge and group by happening in the third (effectively second) vertex.
This plan makes it simpler to reason, reuse and modify the graph building process in automated systems like Pig. One could also create a single vertex with 2 MR inputs and do the reading and merging in that vertex and the group by in the second one. This would mean creating custom input initializer and manager for that vertex to determine the input distribution etc. Which one to choose? If the map side merge significantly reduces the shuffle data to the group by vertex, thus resulting in perf benefits, then one could explore the second plan. Bikas From: Piyush Narang [mailto:pnar...@twitter.com] Sent: Tuesday, February 14, 2017 3:42 PM To: user@tez.apache.org Subject: Re: Reading from multiple paths in a single Tez node Thanks a ton, Rohini. I'll take a look at that. On Tue, Feb 14, 2017 at 3:34 PM, Rohini Palaniswamy <rohini.adi...@gmail.com <mailto:rohini.adi...@gmail.com> > wrote: In Pig, we implement this by doing 3 vertices. Vertex1 (Load with Combiner), Vertex2 (Load with Combiner) -> Vertex3 (Group by). Vertex1 and Vertex2 are made part of a VertexGroup (logical abstraction and not a real vertex), so that their output is seen as one single output by Vertex 3. This approach also works well if Vertex1 and Vertex2 were intermediate vertices and not root vertices with MRInput. https://github.com/apache/pig/blob/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-2.gld (Plan using VertexGroup and 3 vertices) https://github.com/apache/pig/blob/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-2-OPTOFF.gld (This is the unoptimized plan with 4 vertices which is similar to your current cascading plan) On Tue, Feb 14, 2017 at 3:20 PM, Piyush Narang <pnar...@twitter.com <mailto:pnar...@twitter.com> > wrote: Thanks for getting back Rohini and Siddharth. To provide some context, we have two input vertices each reading lzo thrift data from a different path on hdfs. We then merge <http://docs.cascading.org/cascading/2.0/javadoc/cascading/pipe/Merge.html> the data from the two vertices and then groupBy and some aggregations one of the fields. In MR, the reading from the 2 inputs and the merge happens on the mappers and the group + aggregations on the reducers. In case of Tez we have the merge on a different vertex and the group + aggregations on a different vertex (with Cascading choosing scatter gather edges in both cases). Exploring if it would be possible to combine the merge with the groupBy in Cascading. I was wondering if the MultiMRInput would have been an option in cases where we read from 2 or more sources and follow that up with a merge. That might be an option to explore if we're not able to collapse the merge and groupBy. On Tue, Feb 14, 2017 at 9:14 AM, Siddharth Seth <ss...@apache.org <mailto:ss...@apache.org> > wrote: What operations are being performed by these vertices? If there's no advantage of reading multiple sources in a single task - using separate vertices is preferable. At least for Hive, when it read multiple sources in the same vertex, it had to perform some tagging etc for the reduce side to differentiate the inputs. MultiMRInput can be used for public consumption. Like Rohini mentioned, it is used for SMB joins in Hive. IIRC, hive ends up setting this up to read multiple buckets within the same vertex/task. Also - it is possible to hook multiple MRInputs into a single vertex. That will require a custom vertex manager to figure out the parallelism, and how splits from these sources are to be combined. Hive does this for SMB joins, where it'll send a single bucket / groups of buckets from different sources to the same task. (Both sides ordered, and bucketed - so it's possible to do a merge join in this vertex). On Mon, Feb 13, 2017 at 5:37 PM, Piyush Narang <pnar...@twitter.com <mailto:pnar...@twitter.com> > wrote: hi folks, While debugging the DAG generated by a Scalding / Cascading job, I noticed that in Tez we end up with two input vertices - one vertex for each input path. In case of Hadoop on the other hand we end up with our map phase reading from both input datasets. Is this supported in Tez? I noticed that Cascading is currently using MRInput <https://github.com/apache/tez/blob/master/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java> to set up its Tez inputs. I wasn't sure if we could use MultiMRInput <https://github.com/apache/tez/blob/master/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MultiMRInput.java> to read from multiple input directories in the same vertex in Tez or if it has a different purpose. If we can use it, is it safe for public consumption? (noticed it is still annotated with @Evolving). Thanks, -- - Piyush -- - Piyush -- - Piyush