I think this is similar to to the 'merge' join issue not being automatically
supported.
If we have done a GROUP in the past, this data should have been mapped, then
handed off to the reducers and stored on those nodes.
They should be nicely tiled and ready for a tiled merge join with 1/Nth of
the data on each of your nodes (ignoring replicas of course).
Now if COGROUP comes along, and has the previous GROUP as part of the
expression, it should be able to use the previous data already reduced and
on disk as the source of one of the relations.
The other relation may need to be map reduced first of course.
I tried to come up with a simple pig script to EXPLAIN the problem. I think
this one will do it:
>
> foo = LOAD 'group_test1.csv' USING PigStorage(',') AS (col_a:int,
> col_b:int);
> bar = LOAD 'group_test2.csv' USING PigStorage(',') AS (col_a:int,
> col_b:int);
> foo_grouped = GROUP foo BY col_a;
> both_grouped = GROUP foo_grouped BY $0 , bar BY col_a;
> STORE foo_grouped INTO 'foo_grouped';
> STORE both_grouped INTO 'both_grouped';
It first does a global rearrange to form foo_grouped, but then does it again
to form both_grouped. Which of course seems like no fun.
…. here is the EXPLAIN (assuming I'm reading it right).
#-----------------------------------------------
# New Logical Plan:
#-----------------------------------------------
foo_grouped: (Name: LOStore Schema:
group#42:int,foo#43:bag{#57:tuple(col_a#15:int,col_b#16:int)})
|
|---foo_grouped: (Name: LOSplitOutput Schema:
group#42:int,foo#43:bag{#57:tuple(col_a#15:int,col_b#16:int)})
| |
| (Name: Constant Type: boolean Uid: 41)
|
|---foo_grouped: (Name: LOSplit Schema:
group#15:int,foo#39:bag{#57:tuple(col_a#15:int,col_b#16:int)})
|
|---foo_grouped: (Name: LOCogroup Schema:
group#15:int,foo#39:bag{#57:tuple(col_a#15:int,col_b#16:int)})
| |
| col_a:(Name: Project Type: int Uid: 15 Input: 0 Column: 0)
|
|---foo: (Name: LOForEach Schema: col_a#15:int,col_b#16:int)
| |
| (Name: LOGenerate[false,false] Schema:
col_a#15:int,col_b#16:int)ColumnPrune:InputUids=[16,
15]ColumnPrune:OutputUids=[16, 15]
| | |
| | (Name: Cast Type: int Uid: 15)
| | |
| | |---col_a:(Name: Project Type: bytearray Uid: 15
Input: 0 Column: (*))
| | |
| | (Name: Cast Type: int Uid: 16)
| | |
| | |---col_b:(Name: Project Type: bytearray Uid: 16
Input: 1 Column: (*))
| |
| |---(Name: LOInnerLoad[0] Schema: col_a#15:bytearray)
| |
| |---(Name: LOInnerLoad[1] Schema: col_b#16:bytearray)
|
|---foo: (Name: LOLoad Schema:
col_a#15:bytearray,col_b#16:bytearray)RequiredFields:null
both_grouped: (Name: LOStore Schema:
group#47:int,foo_grouped#48:bag{#60:tuple(group#45:int,foo#46:bag{#57:tuple(col_a#15:int,col_b#16:int)})},bar#50:bag{#61:tuple(col_a#17:int,col_b#18:int)})
|
|---both_grouped: (Name: LOCogroup Schema:
group#47:int,foo_grouped#48:bag{#60:tuple(group#45:int,foo#46:bag{#57:tuple(col_a#15:int,col_b#16:int)})},bar#50:bag{#61:tuple(col_a#17:int,col_b#18:int)})
| |
| group:(Name: Project Type: int Uid: 45 Input: 0 Column: 0)
| |
| col_a:(Name: Project Type: int Uid: 17 Input: 1 Column: 0)
|
|---foo_grouped: (Name: LOSplitOutput Schema:
group#45:int,foo#46:bag{#57:tuple(col_a#15:int,col_b#16:int)})
| | |
| | (Name: Constant Type: boolean Uid: 44)
| |
| |---foo_grouped: (Name: LOSplit Schema:
group#15:int,foo#39:bag{#57:tuple(col_a#15:int,col_b#16:int)})
| |
| |---foo_grouped: (Name: LOCogroup Schema:
group#15:int,foo#39:bag{#57:tuple(col_a#15:int,col_b#16:int)})
| | |
| | col_a:(Name: Project Type: int Uid: 15 Input: 0 Column:
0)
| |
| |---foo: (Name: LOForEach Schema: col_a#15:int,col_b#16:int)
| | |
| | (Name: LOGenerate[false,false] Schema:
col_a#15:int,col_b#16:int)ColumnPrune:InputUids=[16,
15]ColumnPrune:OutputUids=[16, 15]
| | | |
| | | (Name: Cast Type: int Uid: 15)
| | | |
| | | |---col_a:(Name: Project Type: bytearray Uid: 15
Input: 0 Column: (*))
| | | |
| | | (Name: Cast Type: int Uid: 16)
| | | |
| | | |---col_b:(Name: Project Type: bytearray Uid: 16
Input: 1 Column: (*))
| | |
| | |---(Name: LOInnerLoad[0] Schema:
col_a#15:bytearray)
| | |
| | |---(Name: LOInnerLoad[1] Schema:
col_b#16:bytearray)
| |
| |---foo: (Name: LOLoad Schema:
col_a#15:bytearray,col_b#16:bytearray)RequiredFields:null
|
|---bar: (Name: LOForEach Schema: col_a#17:int,col_b#18:int)
| |
| (Name: LOGenerate[false,false] Schema:
col_a#17:int,col_b#18:int)ColumnPrune:InputUids=[17,
18]ColumnPrune:OutputUids=[17, 18]
| | |
| | (Name: Cast Type: int Uid: 17)
| | |
| | |---col_a:(Name: Project Type: bytearray Uid: 17 Input: 0
Column: (*))
| | |
| | (Name: Cast Type: int Uid: 18)
| | |
| | |---col_b:(Name: Project Type: bytearray Uid: 18 Input: 1
Column: (*))
| |
| |---(Name: LOInnerLoad[0] Schema: col_a#17:bytearray)
| |
| |---(Name: LOInnerLoad[1] Schema: col_b#18:bytearray)
|
|---bar: (Name: LOLoad Schema:
col_a#17:bytearray,col_b#18:bytearray)RequiredFields:null
#-----------------------------------------------
# Physical Plan:
#-----------------------------------------------
foo_grouped:
Store(file:///Users/burton/projects/foo/foo_grouped:org.apache.pig.builtin.PigStorage)
- scope-15
|
|---foo_grouped: Filter[bag] - scope-13
| |
| Constant(true) - scope-14
|
|---foo_grouped: Split - scope-12
|
|---foo_grouped: Package[tuple]{int} - scope-9
|
|---foo_grouped: Global Rearrange[tuple] - scope-8
|
|---foo_grouped: Local Rearrange[tuple]{int}(false) -
scope-10
| |
| Project[int][0] - scope-11
|
|---foo: New For Each(false,false)[bag] - scope-7
| |
| Cast[int] - scope-2
| |
| |---Project[bytearray][0] - scope-1
| |
| Cast[int] - scope-5
| |
| |---Project[bytearray][1] - scope-4
|
|---foo:
Load(file:///Users/burton/projects/foo/group_test1.csv:PigStorage(',')) -
scope-0
both_grouped:
Store(file:///Users/burton/projects/foo/both_grouped:org.apache.pig.builtin.PigStorage)
- scope-32
|
|---both_grouped: Package[tuple]{int} - scope-27
|
|---both_grouped: Global Rearrange[tuple] - scope-26
|
|---both_grouped: Local Rearrange[tuple]{int}(false) - scope-28
| | |
| | Project[int][0] - scope-29
| |
| |---foo_grouped: Filter[bag] - scope-16
| | |
| | Constant(true) - scope-17
| |
| |---foo_grouped: Split - scope-12
| |
| |---foo_grouped: Package[tuple]{int} - scope-9
| |
| |---foo_grouped: Global Rearrange[tuple] - scope-8
| |
| |---foo_grouped: Local
Rearrange[tuple]{int}(false) - scope-10
| | |
| | Project[int][0] - scope-11
| |
| |---foo: New For Each(false,false)[bag] -
scope-7
| | |
| | Cast[int] - scope-2
| | |
| | |---Project[bytearray][0] - scope-1
| | |
| | Cast[int] - scope-5
| | |
| | |---Project[bytearray][1] - scope-4
| |
| |---foo:
Load(file:///Users/burton/projects/foo/group_test1.csv:PigStorage(',')) -
scope-0
|
|---both_grouped: Local Rearrange[tuple]{int}(false) - scope-30
| |
| Project[int][0] - scope-31
|
|---bar: New For Each(false,false)[bag] - scope-25
| |
| Cast[int] - scope-20
| |
| |---Project[bytearray][0] - scope-19
| |
| Cast[int] - scope-23
| |
| |---Project[bytearray][1] - scope-22
|
|---bar:
Load(file:///Users/burton/projects/foo/group_test2.csv:PigStorage(',')) -
scope-18
2011-08-30 13:10:30,404 [main] INFO
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler -
File concatenation threshold: 100 optimistic? false
2011-08-30 13:10:30,492 [main] INFO
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer
- MR plan size before optimization: 3
2011-08-30 13:10:30,493 [main] INFO
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer
- Merged 1 map-only splittees.
2011-08-30 13:10:30,495 [main] INFO
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer
- Merged 1 out of total 3 MR operators.
2011-08-30 13:10:30,495 [main] INFO
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer
- MR plan size after optimization: 2
#--------------------------------------------------
# Map Reduce Plan
#--------------------------------------------------
MapReduce node scope-33
Map Plan
foo_grouped: Local Rearrange[tuple]{int}(false) - scope-10
| |
| Project[int][0] - scope-11
|
|---foo: New For Each(false,false)[bag] - scope-7
| |
| Cast[int] - scope-2
| |
| |---Project[bytearray][0] - scope-1
| |
| Cast[int] - scope-5
| |
| |---Project[bytearray][1] - scope-4
|
|---foo:
Load(file:///Users/burton/projects/foo/group_test1.csv:PigStorage(',')) -
scope-0--------
Reduce Plan
Split - scope-42
| |
| foo_grouped:
Store(file:///Users/burton/projects/foo/foo_grouped:org.apache.pig.builtin.PigStorage)
- scope-15
| |
|
Store(file:/tmp/temp1306726464/tmp-52335994:org.apache.pig.impl.io.InterStorage)
- scope-34
|
|---foo_grouped: Package[tuple]{int} - scope-9--------
Global sort: false
----------------
MapReduce node scope-40
Map Plan
Union[tuple] - scope-41
|
|---both_grouped: Local Rearrange[tuple]{int}(false) - scope-28
| | |
| | Project[int][0] - scope-29
| |
|
|---Load(file:/tmp/temp1306726464/tmp-52335994:org.apache.pig.impl.io.InterStorage)
- scope-37
|
|---both_grouped: Local Rearrange[tuple]{int}(false) - scope-30
| |
| Project[int][0] - scope-31
|
|---bar: New For Each(false,false)[bag] - scope-25
| |
| Cast[int] - scope-20
| |
| |---Project[bytearray][0] - scope-19
| |
| Cast[int] - scope-23
| |
| |---Project[bytearray][1] - scope-22
|
|---bar:
Load(file:///Users/burton/projects/foo/group_test2.csv:PigStorage(',')) -
scope-18--------
Reduce Plan
both_grouped:
Store(file:///Users/burton/projects/foo/both_grouped:org.apache.pig.builtin.PigStorage)
- scope-32
|
|---both_grouped: Package[tuple]{int} - scope-27--------
Global sort: false
----------------
--
Founder/CEO Spinn3r.com
Location: *San Francisco, CA*
Skype: *burtonator*
Skype-in: *(415) 871-0687*