What version of pig are you using? The example I'm providing will only work
0.9+ since it's using nested foreach statements.
Getting from the flattened and joined stated to the grouped state you
achieved shouldn't take 5 inelegant steps. I'll give you the full working
example I tested with the 3 records in your example.
First things first. Let's load the data, flatten it and join it on the
second dataset:
A = load 'data1'
using PigStorage()
as (item: chararray, d: int, things: bag{(thing: chararray, d1:
int, values: bag{(v:chararray)})});
B = FOREACH A GENERATE item, d, FLATTEN(things);
C = FOREACH B GENERATE item, d, thing, d1, FLATTEN(values);
D = load 'data2'
using PigStorage()
as (value: chararray, result: chararray);
E = join C by things::values::v, D by value;
----------------------------E----------------------------------
(item1,111,thing1,222,value1,value1,result1)
(item1,111,thing1,222,value2,value2,result2)
----------------------------------------------------------------
At this point, you have data1 joined to data2. Now you want to group your
data back. You also apparently don't care about 'item' and 'd'. So you can
project those too.
F = FOREACH E GENERATE C::things::thing as thing,
C::things::d1 as d1,
D::value as value,
D::result as result;
G = GROUP F by (thing, d1);
----------------------------G----------------------------------
((thing1,222),{(thing1,222,value2,result2),(thing1,222,value1,result1)})
-----------------------------------------------------------------
Now, you have 'thing' and 'd1' duplicated. For each tuple in G, your inner
bag needs only 'value' and 'result'. You can do this by using a nested
foreach statement.
H = FOREACH G {
I = FOREACH F GENERATE value, result;
GENERATE group, I;
};
----------------------------H----------------------------------
((thing1,222),{(value2,result2),(value1,result1)})
----------------------------------------------------------------
The inner bag looks good, but you don't want 'd1' in the tuple with
'thing'. You want 'd1' in a tuple with your value-result-pair-bag. You can
use the TOTUPLE UDF to accomplish this.
J = FOREACH H GENERATE group.thing as thing, TOTUPLE(group.d1, I) as values;
----------------------------J----------------------------------
(thing1,(222,{(value2,result2),(value1,result1)}))
----------------------------------------------------------------
Almost there, now you want a bag around
(222,{(value2,result2),(value1,result1)}). First, you group by 'thing'.
K = GROUP J by thing;
----------------------------K----------------------------------
(thing1,{(thing1,(222,{(value2,result2),(value1,result1)}))})
----------------------------------------------------------------
Now you just need to get rid of the duplicate 'thing' in the bag. This is a
bit tricky, but can be done with a nested foreach and a flatten operator.
L = FOREACH K {
M = FOREACH J GENERATE FLATTEN(values);
GENERATE group as thing, M;
};
----------------------------L----------------------------------
(thing1,{(222,{(value2,result2),(value1,result1)})})
----------------------------------------------------------------
Voila! You have the result you are looking for.
Now the full script together:
A = LOAD 'data1'
USING PigStorage()
AS (item: chararray, d: int, things: bag{(thing: chararray, d1:
int, values: bag{(v:chararray)})});
B = FOREACH A GENERATE item, d, FLATTEN(things);
C = FOREACH B GENERATE item, d, thing, d1, FLATTEN(values);
D = LOAD 'data2'
USING PigStorage()
AS (value: chararray, result: chararray);
E = JOIN C BY things::values::v, D BY value;
F = FOREACH E GENERATE C::things::thing AS thing,
C::things::d1 AS d1,
D::value AS value,
D::result AS result;
G = GROUP F BY (thing, d1);
H = FOREACH G {
I = FOREACH F GENERATE value, result;
GENERATE group, I;
};
J = FOREACH H GENERATE group.thing AS thing, TOTUPLE(group.d1, I) AS values;
K = GROUP J BY thing;
L = FOREACH K {
M = FOREACH J GENERATE FLATTEN(values);
GENERATE group AS thing, M;
};
Obviously, you'll have to deal with the corner cases such as handling null
values and such, but this would be the bulk of the algorithm.
On a side note, to answer your original question of how to flatten nested
structures, you can either do two separate foreach statements with flatten
operations like I have in the above script or you can use a nested foreach
statement also.
The first way you have
A = LOAD 'data1'
USING PigStorage()
AS (item: chararray, d: int, things: bag{(thing: chararray, d1:
int, values: bag{(v:chararray)})});
B = FOREACH A GENERATE item, d, FLATTEN(things);
C = FOREACH B GENERATE item, d, thing, d1, FLATTEN(values);
This generates the following execution plan:
MR plan size after optimization: 1
#--------------------------------------------------
# Map Reduce Plan
#--------------------------------------------------
MapReduce node scope-23
Map Plan
C: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-22
|
|---C: New For Each(false,false,false,false,true)[bag] - scope-21
| |
| Project[chararray][0] - scope-11
| |
| Project[int][1] - scope-13
| |
| Project[chararray][2] - scope-15
| |
| Project[int][3] - scope-17
| |
| Project[bag][4] - scope-19
|
|---B: New For Each(false,false,true)[bag] - scope-10
| |
| Cast[chararray] - scope-2
| |
| |---Project[bytearray][0] - scope-1
| |
| Cast[int] - scope-5
| |
| |---Project[bytearray][1] - scope-4
| |
| Cast[bag:{(chararray,int,{(chararray)})}] - scope-8
| |
| |---Project[bytearray][2] - scope-7
|
|---A:
Load(file:///home/pradeepg26/projects/pig-ml/data1:PigStorage) -
scope-0--------
The second way, you can also do:
A = load 'data1'
using PigStorage()
as (item: chararray, d: int, things: bag{(thing: chararray, d1:
int, values: bag{(v:chararray)})});
B = FOREACH A {
C = FOREACH things GENERATE thing, d1, FLATTEN(values);
GENERATE item, d, FLATTEN(C);
}
This method generates the following execution plan.
#--------------------------------------------------
# Map Reduce Plan
#--------------------------------------------------
MapReduce node scope-26
Map Plan
B: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-25
|
|---B: New For Each(false,false,true)[bag] - scope-24
| |
| Project[chararray][0] - scope-11
| |
| Project[int][1] - scope-13
| |
| RelationToExpressionProject[bag][*] - scope-15
| |
| |---C: New For Each(false,false,true)[bag] - scope-23
| | |
| | Project[chararray][0] - scope-17
| | |
| | Project[int][1] - scope-19
| | |
| | Project[bag][2] - scope-21
| |
| |---Project[bag][2] - scope-16
|
|---A: New For Each(false,false,false)[bag] - scope-10
| |
| Cast[chararray] - scope-2
| |
| |---Project[bytearray][0] - scope-1
| |
| Cast[int] - scope-5
| |
| |---Project[bytearray][1] - scope-4
| |
| Cast[bag:{(chararray,int,{(chararray)})}] - scope-8
| |
| |---Project[bytearray][2] - scope-7
|
|---A:
Load(file:///home/pradeepg26/projects/pig-ml/data1:PigStorage) -
scope-0--------
The execution plans are only slightly different and both scripts execute
the same number of jobs. I don't know which one would be more performant
though.
Just thought I'd give an alternate way of doing things. (It'd be
interesting to see which method is more performant. My guess is that
they'll both perform equally well).
Anyway... Hope this helps. It ended up being a lot longer than I
anticipated. =)
Pradeep
On Wed, Jun 5, 2013 at 7:57 PM, David Parks <[email protected]> wrote:
> Ah, ok, that was very helpful, thanks. I've been able to flatten things out
> now. So now I'm trying to re-group 2 levels of bags that I flattened (after
> doing a join).
>
> After some flatten and join operations I end up with data that looks like
> this:
>
> ('item1',111,'thing1',222,'value1','result1')
> ('item1',111,'thing1',222,'value2','result2')
>
> I need to send this data to a UDF, once per 'item', so I need to re-group 2
> levels up (group values & things). With 5 somewhat inelegant steps I
> managed
> to re-group the values correctly and project out just the 'thing' fields:
>
> ('thing1',222,{('value1','result1')})
> ('thing1',222,{('value2','result2')})
>
> But now I want to take that form and turn it into something like this (so I
> can re-join that to the original dataset):
>
> 'thing1', {222, {('value1', 'result1'), ('value2', 'result2')} }
>
> I can't seem to make that happen with a GROUP operation, grouping on the
> bag
> gives an error that the operation isn't supported yet, grouping on 'thing'
> along doesn't yield a useful result...
>
> ====================================
>
> For some context, the original problem is this:
>
> A = load 'data6' as ( item:chararray, d:int, things:bag{(thing:chararray,
> d1:int, values:bag{(v:chararray)})} );
> B = load 'data7' as ( v:chararray, r:chararray );
>
> grunt> cat data1
> 'item1' 111 { ('thing1', 222, {('value1'),('value2')}) }
> grunt> cat data2
> 'value1' 'result1'
> 'value2' 'result2'
>
> We're trying to join the 'result1', 'result2' values in [data2] into the
> structure in [data1]. Then we need to call a UDF once per item so it can
> output the data in a specific format.
>
> An 'item' has 0 or more 'things', a thing has 0 or more 'values', and a
> value may or may not have a 'result' (Simple OO structure with nested
> collections, or 3 straight forward SQL tables, for comparison).
>
>
>
> -----Original Message-----
> From: Russell Jurney [mailto:[email protected]]
> Sent: Tuesday, June 04, 2013 6:53 PM
> To: [email protected]
> Subject: Re: Flattening nested bags
>
> B = foreach A generate item, d, flatten(things); C = foreach B generate
> item, d, thing, d1, flatten(values);
>
> Sent from my iPhone
>
> On Jun 4, 2013, at 5:46 PM, "David Parks" <[email protected]> wrote:
>
> > We've been at our first real use case with pig for quite some time
> > now, and still not successful. I wonder if someone can provide an
> > answer to this very much simplified version of our problem:
> >
> > Input data:
> > ---------------
> > 'item1' 111 { ('thing1', 222, {('value1'),('value2')}) }
> >
> > Load statement for above data:
> > ----------------------------------------
> > A = load 'data6' as ( item:chararray, d:int,
> > things:bag{(thing:chararray, d1:int, values:bag{(v:chararray)})} );
> >
> > Desired result:
> > ------------------
> > ('item1' 111 thing1 222 value1)
> > ('item1' 111 thing1 222 value2)
> >
> > Questions:
> > ----------------
> > - Is there a single step I can use to flatten this? Or will it require
> > doing 2 steps: first flatten 'things', and then take those results and
> > flatten 'values'?
> > - We're really looking for the syntax to get this right. I've posted a
> > number of questions here and on Stack Overflow with lots of good
> > suggestions, and read through the O'Reilly book online, none of which,
> > though, have gotten me past constant errors like "Cannot find field v
> > in values:bag{:tuple(v:chararray)}"
> > - Should I be working on converting our data to SQL-like table formats
> > rather than this more Object-Oriented format with nested collections?
> >
> > Psudo-code attempt (I've tried 50+ versions of this in every form I
> > can gleen from examples out on the internet with no success):
> > ----------------------------------------------------
> > B = FOREACH A GENERATE item, d, things.thing as thing, d1,
> > FLATTEN(things.values.v) as v;
> >
> >
> >
>
>