Dear Wiki user,

You have subscribed to a wiki page or wiki category on "Pig Wiki" for change 
notification.

The following page has been changed by AlanGates:
http://wiki.apache.org/pig/PigMergeJoin

New page:
= Pig Merge Join =

== Problem Statement ==
Often users' data is stored such that both inputs are already totally sorted on 
the join key.  In this case, it is possible to join the data
in the map phase of a map reduce job.  This will provide a significant 
performance speed up compared to passing all of the data through uneeded sort 
and shuffle phases.

== Proposed Solution ==
Pig will implement a merge join algorithm (or sort-merge join, although in this 
case the sort is already assumed to have been done).  As with other join 
algorithm
choices Pig will not attempt to make a choice for the user at this point.  The 
user will instruct Pig to use this algorithm with a using clause:

{{{
    C = join A by $0, B by $0 using "merge";
}}}

Pig will implement this algorithm by selecting the left input of the join to be 
the input file for the map phase, and the right input of the join to be the 
side file.
It will then sample records
from the right input to build an index that that contains, for each sampled 
record, the key and the offset into the file the record begins
at.  This sampling will be done in an initial map only job.  A second MR job 
will then be initiated, with the left input as its input.  Each map will use 
the index to 
 seek to the appropriate record in the right input and begin doing the join.

== Details ==
=== Logical Plan ===
In the logical plan, use of this join will be recorded in !LOJoin (similar to 
the way fragment-replicate join and skew join are).  (The work to convert FR 
Join and Skew
join to use a common LOJoin is not yet done; we shold coordinate work on this 
join with the work on the skew join to avoid duplicating effort.)

=== Physical Plan ===
In the physical plan a !POMergeJoin operator will be created.  It will contain 
the logic to implement the join.  The logic will be:

{{{
    open left input;
    read first key;
    open index;
    find last entry in index < first left key; // even if you find index entry 
== first left key, you must select the previous entry as you don't know where 
the key starts.
    open right input;
    seek to offset in right input indicated by selected index entry;
    while (left keys) {
        advance right input until right key >= left key;
        if (right key == left key) {
            read left records until key changes, storing records into list;
            read right records until key changes, joining each right record 
with each left record in list;
        } else {
            advance left input;
        }
    }
}}}
            
=== Map Reduce Plan ===

The MR compiler will introduce a sampling MR job before the MR job that 
contains the !POMergeJoin.  (The sampling algorithm is described below.)
This sampling job can read as input the output of the previous map
reduce job (or if there is no previous map reduce job the initial input file) 
even if there are physical operators before the !POMergeJoin in the current MR 
job.  That
is, there is no need to create a MR boundary immediately before the sampling as 
there is with order by or skew join.  For example:

{{{
    A = load 'input1';
    B = load 'input2';
    C = filter A by $1 is not null;
    D = join B by $0, C by $0;
}}}

can produce a map reduce plan:

{{{
    Job 1:
        Map: JoinSampleLoader 
        Reduce:

    Job 2:
        Map: filter->join
        Reduce:
}}}

The reason for this difference is that the key location in the file is not 
affected by the filter, and thus the sample need not be taken after the filter 
whereas in the
skew join and order by cases the skew of the key may be affected by the filter.

The sampling algorithm will need to record the key and the offset into the 
input file that the record begins at.  This can be done by subclassing 
!RandomSampleLoader to
create a sampler that appends the offset information to the tuple.  This will 
avoid recreating the sampling algorithm, and allow the sampler to benefit from 
planned
enhancements of !RandomSampleLoader.

How many records per block to sample (thus how large to make the index) is not 
clear.  Initially we
should have it sample one record per block.  We can then experiment to 
understand the space and performance trade offs of increasing the number of 
records sampled per
block.

=== Local Mode ===
In local mode !LOJoin should not be translated to !POMergeJoin, even when the 
user requests a sort merge join.  We do not need to implement a version of this 
join that
does not require the sampling.

== Outer Join ==
This design will work for inner joins, and with slight modifications for left 
outer joins.  It will not work for right outer or full outer joins.  If we wish 
to extend
it to work for those cases at some point in the future, it will have to be 
modified to also sample the left input.  The reason for this is that in the 
current
implementation !POMergeJoin does not know how far past the end of its input to 
keep accepting non-matching keys on the right side.  It will need to know what 
key the next
block of the left input starts on in order to determine when it should stop 
reading keys from the right input.  A sampling pass on the left input that 
reads the first
key of each block could provide this information.

== Multiway Join ==
This algorithm could theoretically be extended to support joins of three or 
more inputs.  For now it will not be.  Pig will give an error if users give 
more than two
inputs to a merge join.  If users wish to do three plus way joins with this 
algorithm they can decompose their joins into a series of two ways joins.  

Reply via email to