Dear Wiki user,

You have subscribed to a wiki page or wiki category on "Pig Wiki" for change 
notification.

The following page has been changed by PradeepKamath:
http://wiki.apache.org/pig/PigMergeJoin

------------------------------------------------------------------------------
  }}}
  
  Pig will implement this algorithm by selecting the left input of the join to 
be the input file for the map phase, and the right input of the join to be the 
side file.
- It will then sample records from the right input to build an index that that 
contains, for each sampled record, the key(s) the filename and the offset into 
the file the record begins
+ It will then sample records from the right input to build an index that 
contains, for each sampled record, the key(s) the filename and the offset into 
the file the record begins
- at.  This sampling will be done in an initial map only job.  A second MR job 
will then be initiated, with the left input as its input.  Each map will use 
the index to 
+ at.  This sampling will be done in an initial map only job.  A second MR job 
will then be initiated, with the left input as its input.  Each map will use 
the index to seek to the appropriate record in the right input and begin doing 
the join.
-  seek to the appropriate record in the right input and begin doing the join.
  
  == Pre conditions for merge join ==
  In the first release merge join will only work under following conditions:
-    * Both inputs are sorted in *ascending* order of join keys. If an input 
consists of many files, there should be a total ordering across the files in 
the ascending order of filename. So for example if one of the inputs to the 
join is a directory called input1 with files a and b under it, the data should 
be sorted in ascending order of join key when read starting at a and ending in 
b. Likewise if an input directory has part files part-00000, part-00001, 
part-00002 and part-00003, the data should be sorted if the files are read in 
the sequence part-00000, part-00001, part-00002 and part-00003.
+    * Both inputs are sorted in *ascending* order of join keys. If an input 
consists of many files, there should be a total ordering across the files in 
the *ascending order of file name*. So for example if one of the inputs to the 
join is a directory called input1 with files a and b under it, the data should 
be sorted in ascending order of join key when read starting at a and ending in 
b. Likewise if an input directory has part files part-00000, part-00001, 
part-00002 and part-00003, the data should be sorted if the files are read in 
the sequence part-00000, part-00001, part-00002 and part-00003.
-    * Each part file of the sorted input should have a size of at least 1 hdfs 
block size (for example if the hdfs block size is 128 MB, each part file should 
be > 128 MB). If the total input size (including all part files) is < a 
blocksize, then the part files should be uniform in size (without large skews 
in sizes).
+    
     * The merge join only has two inputs
     * The loadfunc for the right input of the join should implement the 
SamplableLoader interface (PigStorage does implement the SamplableLoader 
interface).
     * Only inner join will be supported
@@ -30, +29 @@

        * There should be no UDFs in the foreach statement
        * The foreach statement should not change the position of the join keys
        * There should not transformation on the join keys which will change 
the sort order
+ === Performance pre condition ===
+    * For optimal performance, each part file of the left (sorted) input of 
the join should have a size of at least 1 hdfs block size (for example if the 
hdfs block size is 128 MB, each part file should be > 128 MB). If the total 
input size (including all part files) is < a blocksize, then the part files 
should be uniform in size (without large skews in sizes). The main idea is to 
eliminate skew in the amount of input the final map job performing the 
merge-join will process.
+ 
-    * In local mode, merge join will fall back to regular join
+ In local mode, merge join will fall back to regular join
  
  == Implementation Details ==
  === Logical Plan ===
@@ -49, +51 @@

          advance right input until right key >= left key;
          if (right key == left key) {
              read left records until key changes, storing records into list;
-             read right records until key changes, joining each right record 
with each left record in list;
+             while(right key is the same) {
+                join right record with each left record in list;
+                read next right record;
          } else {
              advance left input;
          }
@@ -59, +63 @@

  === Map Reduce Plan ===
  
  The MR compiler will introduce a sampling MR job before the MR job that 
contains the !POMergeJoin.  (The sampling algorithm is described below.)
+ This sampling job can read as input the output of the previous map reduce job 
(or if there is no previous map reduce job the initial input file) even if 
there are physical operators before the !POMergeJoin in the current MR job. No 
MR boundary is created immediately before the sampling as there is with order 
by or skew join.  For example:
- This sampling job can read as input the output of the previous map 
- reduce job (or if there is no previous map reduce job the initial input file) 
even if there are physical operators before the !POMergeJoin in the current MR 
job.  That
- is, no MR boundary is created immediately before the sampling as there is 
with order by or skew join.  For example:
  
  {{{
      A = load 'input1';
@@ -82, +84 @@

          Reduce:
  }}}
  
- The reason for this difference is that the key location in the file is not 
affected by the filter, and thus the sample need not be taken after the filter 
whereas in the
+ The reason for this difference is that the key location in the file is not 
affected by the filter, and thus the sample need not be taken after the filter 
whereas in the skew join and order by cases the skew of the key may be affected 
by the filter.
- skew join and order by cases the skew of the key may be affected by the 
filter.
  
- The sampling algorithm will need to record the key, filename and the offset 
into the input file that the record begins at.  This is done by 
MergeJoinIndexer to
+ The sampling algorithm will need to record the key, filename and the offset 
into the input file that the record begins at.  This is done by 
MergeJoinIndexer which extracts the keys from input tuple and appends filename 
and offset.  
- create a sampler that extract the keys from tuple and append filename and 
offset.  
  
- How many records per block to sample (thus how large to make the index) is 
not clear.  Initially we
- should have it sample one record per block.  We can then experiment to 
understand the space and performance trade offs of increasing the number of 
records sampled per
+ How many records per block to sample (thus how large to make the index) is 
not clear.  Initially we should have it sample one record per block.  We can 
then experiment to understand the space and performance trade offs of 
increasing the number of records sampled per block.
- block.
  
  === Local Mode ===
- In local mode !LOJoin should not be translated to !POMergeJoin, even when the 
user requests a sort merge join.  We do not need to implement a version of this 
join that
+ In local mode !LOJoin should not be translated to !POMergeJoin, even when the 
user requests a sort merge join.  We do not need to implement a version of this 
join that does not require the sampling.
- does not require the sampling.
  
  == Outer Join ==
+ This design will work for inner joins, and with slight modifications for left 
outer joins.  It will not work for right outer or full outer joins.  If we wish 
to extend it to work for those cases at some point in the future, it will have 
to be modified to also sample the left input.  The reason for this is that in 
the current implementation !POMergeJoin does not know how far past the end of 
its input to keep accepting non-matching keys on the right side.  It will need 
to know what key the next block of the left input starts on in order to 
determine when it should stop reading keys from the right input.  A sampling 
pass on the left input that reads the first key of each block could provide 
this information. (Is the intent that each map task will at the end of its 
input continue reading keys from the right side till the first key in the next 
block and perform the outer join - for the outer join for the first key in the 
next block onwards the map task corresponding to that bloc
 k will handle the processing. The extra corner case is the for the first key 
on the left input the outer join for the all the right keys less than that key 
will need to be done by the map task processing the first key (the first key 
would be the first entry in the index for the left side)).
- This design will work for inner joins, and with slight modifications for left 
outer joins.  It will not work for right outer or full outer joins.  If we wish 
to extend
- it to work for those cases at some point in the future, it will have to be 
modified to also sample the left input.  The reason for this is that in the 
current
- implementation !POMergeJoin does not know how far past the end of its input 
to keep accepting non-matching keys on the right side.  It will need to know 
what key the next
- block of the left input starts on in order to determine when it should stop 
reading keys from the right input.  A sampling pass on the left input that 
reads the first
- key of each block could provide this information.
  
  In current implementation (r806281) only inner joins are supported.
  
  == Multiway Join ==
+ This algorithm could theoretically be extended to support joins of three or 
more inputs.  For now it will not be.  Pig will give an error if users give 
more than two inputs to a merge join.  If users wish to do three plus way joins 
with this algorithm they can decompose their joins into a series of two ways 
joins.
- This algorithm could theoretically be extended to support joins of three or 
more inputs.  For now it will not be.  Pig will give an error if users give 
more than two
- inputs to a merge join.  If users wish to do three plus way joins with this 
algorithm they can decompose their joins into a series of two ways joins.
  
  ----
  We benchmarked the performance of merge-join. Numbers are in table below. 

Reply via email to