[ 
https://issues.apache.org/jira/browse/PIG-955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=12754319#action_12754319
 ] 

Ying He commented on PIG-955:
-----------------------------

the sampling process generated a file which contains skewed keys and their  
pre-allocated reducer indexes. Each (key, beginning index, ending index) is 
stored as a tuple.

during join process, this file is loaded by SkewedPartitioner as lookup table. 
For tuples from partition table, its key is matched against this lookup table, 
if match is found, it returns a value in range of [beginning index, ending 
index] in round robin fashion. If no match found, it then use hash() to 
calculate index.

the problem is  in SkewedPartitioner, when looking up the table, the 
PigNullableWritable format of input tuple is used, while the lookup table uses 
Pig type Tuple as keys. Therefore,  no match is found. The indexes are 
calculated using hash() even for skewed keys.  This causes the data for this 
key all goes to the same reducer. 

But for streaming table,  if key is skewed key, each tuple is replicated  to 
each reducer that are pre-allocated during sampling process.

Because the reducer indexes are calculated wrong for skewed keys in partition 
table, tuples from first table are sent to wrong reducers,  if it doesn't fall 
into its pre-calculated index range, the join with second table ends up with 
empty data set for that key.  The query still appears successfully, but it has 
data loss.

The fix is to change SkewedPartitioner to use correct object type to lookup 
skewed key tables



> Skewed join generates  incorrect results 
> -----------------------------------------
>
>                 Key: PIG-955
>                 URL: https://issues.apache.org/jira/browse/PIG-955
>             Project: Pig
>          Issue Type: Improvement
>            Reporter: Ying He
>         Attachments: PIG-955.patch
>
>
> Fragmented replicated join has a few limitations:
>  - One of the tables needs to be loaded into memory
>  - Join is limited to two tables
> Skewed join partitions the table and joins the records in the reduce phase. 
> It computes a histogram of the key space to account for skewing in the input 
> records. Further, it adjusts the number of reducers depending on the key 
> distribution.
> We need to implement the skewed join in pig.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.

Reply via email to