If I understand correctly, you are objecting to the reshuffle of foo_grouped
based on col_a when it was just shuffled based on col_a.

I've considered this. We currently don't take existing partitioning into
account; that would be handy. But we need to evaluate the cost/benefit here.

This only works when parallelism is the same for both group operators.

The number of MR jobs doesn't get reduced; the number of required mappers
gets reduced to only those needed to read the second relation, and the
reducers have to pull from the previously generated data as side files.

As Hadoop does not specify locality policies for reducers (afaik), the
sidefiles would likely need to be read over the network. This might be
faster than what happens now because you avoid some IO and the sort, but I
am not totally convinced it materially affects total runtime for non-trivial
examples.

Do you have any cheap ways to estimate how much savings that gets us in
these kinds of scenarios?

D

On Tue, Aug 30, 2011 at 1:20 PM, Kevin Burton <[email protected]> wrote:

> I think this is similar to to the 'merge' join issue not being
> automatically
> supported.
>
> If we have done a GROUP in the past, this data should have been mapped,
> then
> handed off to the reducers and stored on those nodes.
>
> They should be nicely tiled and ready for a tiled merge join with 1/Nth of
> the data on each of your nodes (ignoring replicas of course).
>
> Now if COGROUP comes along, and has the previous GROUP as part of the
> expression, it should be able to use the previous data already reduced and
> on disk as the source of one of the relations.
>
> The other relation may need to be map reduced first of course.
>
> I tried to come up with a simple pig script to EXPLAIN the problem.  I
> think
> this one will do it:
>
> >
> > foo = LOAD 'group_test1.csv' USING PigStorage(',') AS (col_a:int,
> > col_b:int);
> > bar = LOAD 'group_test2.csv' USING PigStorage(',') AS (col_a:int,
> > col_b:int);
> > foo_grouped = GROUP foo BY col_a;
> > both_grouped = GROUP foo_grouped BY $0 , bar BY col_a;
> > STORE foo_grouped INTO 'foo_grouped';
> > STORE both_grouped INTO 'both_grouped';
>
>
> It first does a global rearrange to form foo_grouped, but then does it
> again
> to form both_grouped.  Which of course seems like no fun.
>
> …. here is the EXPLAIN (assuming I'm reading it right).
>
>
> #-----------------------------------------------
> # New Logical Plan:
> #-----------------------------------------------
> foo_grouped: (Name: LOStore Schema:
> group#42:int,foo#43:bag{#57:tuple(col_a#15:int,col_b#16:int)})
> |
> |---foo_grouped: (Name: LOSplitOutput Schema:
> group#42:int,foo#43:bag{#57:tuple(col_a#15:int,col_b#16:int)})
>    |   |
>    |   (Name: Constant Type: boolean Uid: 41)
>    |
>    |---foo_grouped: (Name: LOSplit Schema:
> group#15:int,foo#39:bag{#57:tuple(col_a#15:int,col_b#16:int)})
>        |
>        |---foo_grouped: (Name: LOCogroup Schema:
> group#15:int,foo#39:bag{#57:tuple(col_a#15:int,col_b#16:int)})
>            |   |
>            |   col_a:(Name: Project Type: int Uid: 15 Input: 0 Column: 0)
>            |
>            |---foo: (Name: LOForEach Schema: col_a#15:int,col_b#16:int)
>                |   |
>                |   (Name: LOGenerate[false,false] Schema:
> col_a#15:int,col_b#16:int)ColumnPrune:InputUids=[16,
> 15]ColumnPrune:OutputUids=[16, 15]
>                |   |   |
>                |   |   (Name: Cast Type: int Uid: 15)
>                |   |   |
>                |   |   |---col_a:(Name: Project Type: bytearray Uid: 15
> Input: 0 Column: (*))
>                |   |   |
>                |   |   (Name: Cast Type: int Uid: 16)
>                |   |   |
>                |   |   |---col_b:(Name: Project Type: bytearray Uid: 16
> Input: 1 Column: (*))
>                |   |
>                |   |---(Name: LOInnerLoad[0] Schema: col_a#15:bytearray)
>                |   |
>                |   |---(Name: LOInnerLoad[1] Schema: col_b#16:bytearray)
>                |
>                |---foo: (Name: LOLoad Schema:
> col_a#15:bytearray,col_b#16:bytearray)RequiredFields:null
>
> both_grouped: (Name: LOStore Schema:
>
> group#47:int,foo_grouped#48:bag{#60:tuple(group#45:int,foo#46:bag{#57:tuple(col_a#15:int,col_b#16:int)})},bar#50:bag{#61:tuple(col_a#17:int,col_b#18:int)})
> |
> |---both_grouped: (Name: LOCogroup Schema:
>
> group#47:int,foo_grouped#48:bag{#60:tuple(group#45:int,foo#46:bag{#57:tuple(col_a#15:int,col_b#16:int)})},bar#50:bag{#61:tuple(col_a#17:int,col_b#18:int)})
>    |   |
>    |   group:(Name: Project Type: int Uid: 45 Input: 0 Column: 0)
>    |   |
>    |   col_a:(Name: Project Type: int Uid: 17 Input: 1 Column: 0)
>    |
>    |---foo_grouped: (Name: LOSplitOutput Schema:
> group#45:int,foo#46:bag{#57:tuple(col_a#15:int,col_b#16:int)})
>    |   |   |
>    |   |   (Name: Constant Type: boolean Uid: 44)
>    |   |
>    |   |---foo_grouped: (Name: LOSplit Schema:
> group#15:int,foo#39:bag{#57:tuple(col_a#15:int,col_b#16:int)})
>    |       |
>    |       |---foo_grouped: (Name: LOCogroup Schema:
> group#15:int,foo#39:bag{#57:tuple(col_a#15:int,col_b#16:int)})
>    |           |   |
>    |           |   col_a:(Name: Project Type: int Uid: 15 Input: 0 Column:
> 0)
>    |           |
>    |           |---foo: (Name: LOForEach Schema: col_a#15:int,col_b#16:int)
>    |               |   |
>    |               |   (Name: LOGenerate[false,false] Schema:
> col_a#15:int,col_b#16:int)ColumnPrune:InputUids=[16,
> 15]ColumnPrune:OutputUids=[16, 15]
>    |               |   |   |
>    |               |   |   (Name: Cast Type: int Uid: 15)
>    |               |   |   |
>    |               |   |   |---col_a:(Name: Project Type: bytearray Uid: 15
> Input: 0 Column: (*))
>    |               |   |   |
>    |               |   |   (Name: Cast Type: int Uid: 16)
>    |               |   |   |
>    |               |   |   |---col_b:(Name: Project Type: bytearray Uid: 16
> Input: 1 Column: (*))
>    |               |   |
>    |               |   |---(Name: LOInnerLoad[0] Schema:
> col_a#15:bytearray)
>    |               |   |
>    |               |   |---(Name: LOInnerLoad[1] Schema:
> col_b#16:bytearray)
>    |               |
>    |               |---foo: (Name: LOLoad Schema:
> col_a#15:bytearray,col_b#16:bytearray)RequiredFields:null
>    |
>    |---bar: (Name: LOForEach Schema: col_a#17:int,col_b#18:int)
>        |   |
>        |   (Name: LOGenerate[false,false] Schema:
> col_a#17:int,col_b#18:int)ColumnPrune:InputUids=[17,
> 18]ColumnPrune:OutputUids=[17, 18]
>        |   |   |
>        |   |   (Name: Cast Type: int Uid: 17)
>        |   |   |
>        |   |   |---col_a:(Name: Project Type: bytearray Uid: 17 Input: 0
> Column: (*))
>        |   |   |
>        |   |   (Name: Cast Type: int Uid: 18)
>        |   |   |
>        |   |   |---col_b:(Name: Project Type: bytearray Uid: 18 Input: 1
> Column: (*))
>        |   |
>        |   |---(Name: LOInnerLoad[0] Schema: col_a#17:bytearray)
>        |   |
>        |   |---(Name: LOInnerLoad[1] Schema: col_b#18:bytearray)
>        |
>        |---bar: (Name: LOLoad Schema:
> col_a#17:bytearray,col_b#18:bytearray)RequiredFields:null
>
> #-----------------------------------------------
> # Physical Plan:
> #-----------------------------------------------
> foo_grouped:
>
> Store(file:///Users/burton/projects/foo/foo_grouped:org.apache.pig.builtin.PigStorage)
> - scope-15
> |
> |---foo_grouped: Filter[bag] - scope-13
>    |   |
>    |   Constant(true) - scope-14
>    |
>    |---foo_grouped: Split - scope-12
>        |
>        |---foo_grouped: Package[tuple]{int} - scope-9
>            |
>            |---foo_grouped: Global Rearrange[tuple] - scope-8
>                |
>                |---foo_grouped: Local Rearrange[tuple]{int}(false) -
> scope-10
>                    |   |
>                    |   Project[int][0] - scope-11
>                    |
>                    |---foo: New For Each(false,false)[bag] - scope-7
>                        |   |
>                        |   Cast[int] - scope-2
>                        |   |
>                        |   |---Project[bytearray][0] - scope-1
>                        |   |
>                        |   Cast[int] - scope-5
>                        |   |
>                        |   |---Project[bytearray][1] - scope-4
>                        |
>                        |---foo:
> Load(file:///Users/burton/projects/foo/group_test1.csv:PigStorage(',')) -
> scope-0
>
> both_grouped:
>
> Store(file:///Users/burton/projects/foo/both_grouped:org.apache.pig.builtin.PigStorage)
> - scope-32
> |
> |---both_grouped: Package[tuple]{int} - scope-27
>    |
>    |---both_grouped: Global Rearrange[tuple] - scope-26
>        |
>        |---both_grouped: Local Rearrange[tuple]{int}(false) - scope-28
>        |   |   |
>        |   |   Project[int][0] - scope-29
>        |   |
>        |   |---foo_grouped: Filter[bag] - scope-16
>        |       |   |
>        |       |   Constant(true) - scope-17
>        |       |
>        |       |---foo_grouped: Split - scope-12
>        |           |
>        |           |---foo_grouped: Package[tuple]{int} - scope-9
>        |               |
>        |               |---foo_grouped: Global Rearrange[tuple] - scope-8
>        |                   |
>        |                   |---foo_grouped: Local
> Rearrange[tuple]{int}(false) - scope-10
>        |                       |   |
>        |                       |   Project[int][0] - scope-11
>        |                       |
>        |                       |---foo: New For Each(false,false)[bag] -
> scope-7
>        |                           |   |
>        |                           |   Cast[int] - scope-2
>        |                           |   |
>        |                           |   |---Project[bytearray][0] - scope-1
>        |                           |   |
>        |                           |   Cast[int] - scope-5
>        |                           |   |
>        |                           |   |---Project[bytearray][1] - scope-4
>        |                           |
>        |                           |---foo:
> Load(file:///Users/burton/projects/foo/group_test1.csv:PigStorage(',')) -
> scope-0
>        |
>        |---both_grouped: Local Rearrange[tuple]{int}(false) - scope-30
>            |   |
>            |   Project[int][0] - scope-31
>            |
>            |---bar: New For Each(false,false)[bag] - scope-25
>                |   |
>                |   Cast[int] - scope-20
>                |   |
>                |   |---Project[bytearray][0] - scope-19
>                |   |
>                |   Cast[int] - scope-23
>                |   |
>                |   |---Project[bytearray][1] - scope-22
>                |
>                |---bar:
> Load(file:///Users/burton/projects/foo/group_test2.csv:PigStorage(',')) -
> scope-18
>
> 2011-08-30 13:10:30,404 [main] INFO
>  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler -
> File concatenation threshold: 100 optimistic? false
> 2011-08-30 13:10:30,492 [main] INFO
>
>  
> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer
> - MR plan size before optimization: 3
> 2011-08-30 13:10:30,493 [main] INFO
>
>  
> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer
> - Merged 1 map-only splittees.
> 2011-08-30 13:10:30,495 [main] INFO
>
>  
> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer
> - Merged 1 out of total 3 MR operators.
> 2011-08-30 13:10:30,495 [main] INFO
>
>  
> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer
> - MR plan size after optimization: 2
> #--------------------------------------------------
> # Map Reduce Plan
> #--------------------------------------------------
> MapReduce node scope-33
> Map Plan
> foo_grouped: Local Rearrange[tuple]{int}(false) - scope-10
> |   |
> |   Project[int][0] - scope-11
> |
> |---foo: New For Each(false,false)[bag] - scope-7
>    |   |
>    |   Cast[int] - scope-2
>    |   |
>    |   |---Project[bytearray][0] - scope-1
>    |   |
>    |   Cast[int] - scope-5
>    |   |
>    |   |---Project[bytearray][1] - scope-4
>    |
>    |---foo:
> Load(file:///Users/burton/projects/foo/group_test1.csv:PigStorage(',')) -
> scope-0--------
> Reduce Plan
> Split - scope-42
> |   |
> |   foo_grouped:
>
> Store(file:///Users/burton/projects/foo/foo_grouped:org.apache.pig.builtin.PigStorage)
> - scope-15
> |   |
> |
>
> Store(file:/tmp/temp1306726464/tmp-52335994:org.apache.pig.impl.io.InterStorage)
> - scope-34
> |
> |---foo_grouped: Package[tuple]{int} - scope-9--------
> Global sort: false
> ----------------
>
> MapReduce node scope-40
> Map Plan
> Union[tuple] - scope-41
> |
> |---both_grouped: Local Rearrange[tuple]{int}(false) - scope-28
> |   |   |
> |   |   Project[int][0] - scope-29
> |   |
> |
>
> |---Load(file:/tmp/temp1306726464/tmp-52335994:org.apache.pig.impl.io.InterStorage)
> - scope-37
> |
> |---both_grouped: Local Rearrange[tuple]{int}(false) - scope-30
>    |   |
>    |   Project[int][0] - scope-31
>    |
>    |---bar: New For Each(false,false)[bag] - scope-25
>        |   |
>        |   Cast[int] - scope-20
>        |   |
>        |   |---Project[bytearray][0] - scope-19
>        |   |
>        |   Cast[int] - scope-23
>        |   |
>        |   |---Project[bytearray][1] - scope-22
>        |
>        |---bar:
> Load(file:///Users/burton/projects/foo/group_test2.csv:PigStorage(',')) -
> scope-18--------
> Reduce Plan
> both_grouped:
>
> Store(file:///Users/burton/projects/foo/both_grouped:org.apache.pig.builtin.PigStorage)
> - scope-32
> |
> |---both_grouped: Package[tuple]{int} - scope-27--------
> Global sort: false
> ----------------
>
>
> --
>
> Founder/CEO Spinn3r.com
>
> Location: *San Francisco, CA*
> Skype: *burtonator*
>
> Skype-in: *(415) 871-0687*
>

Reply via email to