Dear Wiki user,

You have subscribed to a wiki page or wiki category on "Pig Wiki" for change 
notification.

The "MapSideCogroup" page has been changed by AlanGates.
http://wiki.apache.org/pig/MapSideCogroup

--------------------------------------------------

New page:
= Pig Map Side Cogroup Proposal =

This document details the proposal to build a mapside cogroup operator in Pig.

== Restrictions ==
The map side cogroup would initially work with the following restrictions:

 1. All loaded files must be of the same storage type (no cogrouping Zebra and 
!PigStorage files).
 1. There must exist a loader for this storage type that implements Pig's 
!IndexableLoadFunc interface (currently only Zebra's !TableLoader meets this 
criteria.)
 1. The loader used for this must guarantee that it does not split a single 
value of a key across multiple splits.
 1. Inputs to cogroup must be aliases of load statements. No other operators 
are allowed.

== Design ==

The map side cogroup would work as follows:

 1. Add an interface !KeyValuesNotSplitLoader.  This interface will have only 
one method.  When called this method will indicate to the loader that it needs 
to not split keys across splits for this particular load.
 1. Run an initial MR job to build an index on the left most (assumably the 
largest) input using Pig's !MergeJoinIndexer
 1. Pig will need to construct the second MR job such that splits from the left 
most input are used as the splits for the job.  Loads of the other inputs 
(assumably the smaller inputs) will be done via the !IndexableLoadFunc.
 1. Currently !PigSplit contains the split number.  This information will need 
to be recorded in the UDFContext so that it can be retrieved by POMergeCogroup 
operator.  This transfer should probably be done in 
!PigInputFormat.createRecordReader.
 1. Inside a new POMergeCogroup operator:
{{{
    Determine which split we are in
    if (not last split) {
        Using the index generated by the MergeJoinIndexer,
              determine the first key of the next split
    }
    if (in first split) {
        open each of the other inputs at the beginning
    } else {
        Using the index generated in 1, determine the first key of our split
        open each of the other inputs at this first key 
    }

    construct a heap, using the cogrouping key as the key for the heap
    foreach (input) {
        pull first record
        annotate with input number it came from
        insert into heap
    }
    while (there are records in heap) {
        pull top record from heap
        place in bag based on which input it came from
        if (key from large && EOF on split) continue
        pull record from input that last record from top of heap came from
        if (key pulled < first key in next split) insert into heap
    }
    output final record
}}}
    

== Example ==
Assume a set of tables `mytables` with one large file and n smaller files.  
These files have schemas:

{{{
large: (k, x, y)
small1: (k, x, z)
...
smalln: (k, z, alpha)
}}}

Then the Pig Latin script would look like:

{{{
A = load 'large';
B1 = load 'small1';
...
Bn = load 'smalln';
C = cogroup A by k, B1 by k, ..., Bn by k using "merge";
}}}

And C would have a schema of:
{{{
k, bag: large{(k, x, y)}, bag: small1{(k, x, z)}, ..., bag: smalln{(k, z, 
alpha)}
}}}

Reply via email to