[ 
https://issues.apache.org/jira/browse/PIG-871?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=12731389#action_12731389
 ] 

Ankur commented on PIG-871:
---------------------------

Sorry for coming back late in this. 

The problem mostly manifest itself in case of large datasets which is skewed in 
favour of few hundred group keys. The typical approach that user adopts in this 
case is to generate an additional group key ( typically an integer) in a random 
or round-robin fashion, group on original + generated and increase the 
parallelism and task heap size to see if that works. 

A use case is that user wants to separate out the data into 5 - 10 major 
buckets (directories) and for each major bucket he wants to have data separated 
out into 20 - 100 minor buckets (again directories).

An example script would look like this

data1 = LOAD 'myfile_1' Using PigStorage() as (f1, f2, f3);
data2 = LOAD 'myfile_2' Using PigStorage() as (f1, f4, f5);
filtered = FILTER data By <some condition>
joined_data = JOIN data1 By f1, data2 By f1 Parallel 100
projected_data = FOREACH joined_data generate f1, GenerateRandomIntUpto(100) as 
part, f2,f3,f4,f5 ;
SPLIT projected_data INTO major_bucket_1 IF <some_condition>, major_bucket_2 IF 
<some_condition>, major_bucket_3 IF <some_condition>;

group1= Group major_bucket_1 By (f1, part) Parallel 100;
result1 = FOREACH group1 generate FLATTEN(group),  FLATTEN($1);
STORE result1 INTO 'major_bucket_1' Using CustomStore();

group2= Group major_bucket_2 By (f2, part) Parallel 100;
result2 = FOREACH group2 generate FLATTEN(group),  FLATTEN($1);
STORE result2 INTO 'major_bucket_2' Using CustomStore();

group3= Group major_bucket_3 By (f3, part) Parallel 100;
result3 = FOREACH group3 generate FLATTEN(group),  FLATTEN($1);
STORE result3 INTO 'major_bucket_3' Using CustomStore();

The expectation here is to get the final result in a directory structure like 
this

major_bucket_1/minor_bucket_1/part-files
major_bucket_1/minor_bucket_2/part-files
major_bucket_1/minor_bucket_3/part-files
...

major_bucket_2/minor_bucket_1/part-files
major_bucket_2/minor_bucket_2/part-files
major_bucket_2/minor_bucket_3/part-files
..

Note:- The minor_bucket directory name is derived from the group key.

> Improve distribution of keys in reduce phase
> --------------------------------------------
>
>                 Key: PIG-871
>                 URL: https://issues.apache.org/jira/browse/PIG-871
>             Project: Pig
>          Issue Type: Improvement
>    Affects Versions: 0.3.0
>            Reporter: Ankur
>
> The default hashing scheme used to distribute keys in reduce phase sometimes 
> results in an uneven distribution of keys resulting in 5 - 10 % of reducers 
> being overloaded with data. This bottleneck makes the PIG jobs really slow 
> and gives users a bad impression.
> While there is no bullet proof solution to the problem in general, the 
> hashing can certainly be improved for better distribution. The proposal here 
> is to evaluate and incorporate other hashing schemes that give high avalanche 
> and more even distribution. We can start by evaluating MurmurHash which is 
> Apache 2.0 licensed and freely available here - 
> http://www.getopt.org/murmur/MurmurHash.java

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.

Reply via email to