Sounds like your UDF you are doing exactly what MultiStore is doing. If I am
correct here then MultiStorage should do the job for you and you should not be
needing to write a separate similar UDF. Like I said few thousand groups
should not be an issue for a small to medium sized cluster given that it is
configured correctly.
I am taking a guess here but you might be hitting uneven data distribution
issue where most of your groups might be going to few reducers thus putting a
strain on the memory requirements of the reduce running the STORE function. If
that's the case then you can get around the issue by generating a random value,
grouping on both the keyField and random value and then projecting out before
storing.
A1 = FOREACH A GENEREATE f1,f2, ..., RANDOM() as rand;
B = group A1 BY (f1, rand);
C = FOREACH B GENERATE f1,f2...; // no 'rand' here;
STORE C INTO '/my/home/tmp' USING MultiStorage('/my/home/actual-dir', '0');
If you discover other issues with MultiStorage then please open a JIRA
Regards
-...@nkur
On 2/2/10 11:51 PM, "Jennie Cochran-Chinn" <[email protected]> wrote:
Thanks for the clarification. We went down the path of using a UDF
inside the FOREACH after the GROUP as yes, there are >5k unique
groups. We cant reduce the number of unique groups as there is a
downstream application whose requirements we must meet.
To further the question, our current solution is of this form:
A - load 'data';
B = group A by $0;
C = foreach B storeUdf(*);
where storeUdf(*) opens a storage stream for the individual groups and
we get around the # of open streams issue. Do you have any pointers
on opening/closing the stream and binding to PigStorage inside the
storeUdf function? We mimick how MultiStorage opens and closes
streams/PigStorage - is there anything else there I should be looking
out for or is that pretty standard?
Thanks!
Jennie
On Feb 2, 2010, at 4:33 AM, Ankur C. Goel wrote:
> Jennie,
> A hadoop cluster has an enforced limit on the number of
> concurrent streams that can be kept open at any time.
> This limit is the number of concurrent threads that a Datanode can
> run for doing I/O specified by the cluster level job config
> parameter - dfs.datanode.max.xcievers.
> So the max number of open streams = Number of nodes * threads per
> datanode.
>
> MultiStore can do what you want but is constrained by the above
> limit and not by itself coz going past the limit will cause datanode
> to drop connections.
> Also it is not a good idea to use MultiStore if you expect more than
> few thousand unique groups as outputs from your reducers.
>
> Try reducing the number of unique groups before storing. You should
> be able to do it via a simple UDF.
>
> -...@nkur
>
>
> On 2/1/10 11:32 AM, "Rekha Joshi" <[email protected]> wrote:
>
> If it pig0.3 or higher you would be able to just use STORE command
> multiple times in the pig script to store results directly into hdfs.
> A = LOAD ...
> ...
> B = GROUP A ...
> C = GROUP A ...
> ...
> STORE B ...
> STORE C ...
>
> Also look into
> http://hadoop.apache.org/pig/docs/r0.3.0/piglatin.html#Multi-Query+Execution
> I do not how he is your data set,but you might be able to increase
> the memory parameters to be able to do it in single script.
>
> Cheers,
> /R
>
> On 1/30/10 7:36 AM, "Jennie Cochran-Chinn" <[email protected]>
> wrote:
>
> I had a question about storing data to different files. The basic
> jist of what we are doing is taking a large set of data, performing a
> group by and then storing each group's dataBag into a distinct file
> (on S3). Currently we are using a UDF inside a FOREACH loop that
> writes the dataBag to a local tmp file and then pushes it to S3. This
> does not seem to be the ideal way to do this and we were wondering if
> anyone had any suggestions. I know there is the MultiStore function
> in the piggybank, but given that we have many different groups, it
> does not appear that would scale very well. For instance, in some
> experiments the cluster I was using could not open new streams and
> thus failed.
>
> Thanks,
> Jennie
>
>