[Pig Wiki] Update of "PigMergeJoin" by AshutoshChauhan

2009-08-20 Thread Apache Wiki
Dear Wiki user,

You have subscribed to a wiki page or wiki category on "Pig Wiki" for change 
notification.

The following page has been changed by AshutoshChauhan:
http://wiki.apache.org/pig/PigMergeJoin

--
  inputs to a merge join.  If users wish to do three plus way joins with this 
algorithm they can decompose their joins into a series of two ways joins.
  
  
+ We benchmarked the performance of merge-join. Numbers are in table below. 
+ Since joins produce large number of output rows(n-squared in worst case) they 
are filtered out and not written to disk, so runtimes consist of mostly of CPU 
time.
+ Data size is approximately 1.5 GB for 1M rows.
+ 
  == Performance Benchmark ==
- ||# of rows || # of rows || Sym-Hash Join || Merge-Join||
+ ||# of rows || # of rows || Sym-Hash Join || Merge-Join||Remarks||
  ||100K||100K||1m30.588s|| 0m41.764s||
  ||5M  ||  5M|| 1m5.579s|| 0m51.535s||
  ||15M||15M||  2m55.812s|| 2m16.906s||
@@ -118, +122 @@

  ||20M||20M|| 10m30.819s|| 5m42.302s||
  ||20M||  20M|| 12m25.878s|| 5m42.302s||
  ||20M||20M|| 15m56.533s|| 7m57.646s||
- ||20M || 20M|| 112m20.478s||17m16.542s||   
+ ||20M||  20M||112m20.478s||17m16.542s||Zipf Key Distribution||
- ||100M || 20M|| 30mins||  15mins ||
+ ||100M|| 20M|| 30mins||   15mins ||
- ||50M || 100M|| 42mins||28mins||   
+ ||50M || 100M|| 42mins ||28mins||
- ||50M || 100M|| 41mins, 56sec||   28mins||   
- ||50M || 100M|| 42mins||  28mins  ||   
+ ||50M || 100M|| 42mins   ||28mins||
+ ||50M || 100M|| 42mins   ||  28mins  ||
- ||100M ||100M|| 74mins||  44mins||   
+ ||100M|| 100M|| 74mins   ||44mins||
  
+ In our tests, we found merge-join is faster then symmetric hash join anywhere 
from 30% to 100% depending on size of data, skew in keys, number of map waves 
run etc.
+ One particular thing which affects merge join is how is input data 
distributed among files. We recommend to have all input files to be 
approximately equal in size or atleast as large as block size.
  
  == Phase 2 ==
  


[Pig Wiki] Update of "PigMergeJoin" by AshutoshChauhan

2009-08-20 Thread Apache Wiki
Dear Wiki user,

You have subscribed to a wiki page or wiki category on "Pig Wiki" for change 
notification.

The following page has been changed by AshutoshChauhan:
http://wiki.apache.org/pig/PigMergeJoin

--
  
  
  == Performance Benchmark ==
- ||# of rows || # of rows || Sym-Hash Join || Merge-Join || Skew-Join || 
Remarks ||
+ ||# of rows || # of rows || Sym-Hash Join || Merge-Join||
- ||100K||  100K||  1m30.588s||  0m41.764s||
+ ||100K||100K||1m30.588s|| 0m41.764s||
- ||5M||5M|| 1m5.579s|| 0m51.535s|| 
+ ||5M  ||  5M|| 1m5.579s|| 0m51.535s||
- |15M||15M||   2m55.812s|| 2m16.906s|| 
+ ||15M||15M||  2m55.812s|| 2m16.906s||
- ||15M||   15M||   5m21.030s|| 4m12.125s|| 
+ ||15M||15M||  5m21.030s|| 4m12.125s||
- ||20M||   20M||10m30.819s||5m42.302s||
+ ||20M||20M|| 10m30.819s|| 5m42.302s||
- ||20M ||20M   ||12m25.878s ||5m42.302s||  
+ ||20M||  20M|| 12m25.878s|| 5m42.302s||
- ||20M||   20M || 15m56.533s||  7m57.646s||
+ ||20M||20M|| 15m56.533s|| 7m57.646s||
- ||20M||   20M||   .   || 17m16.542s   ||112m20.478s
+ ||20M || 20M|| 112m20.478s||17m16.542s||   
- ||100M||20M|| 30mins||15mins||
+ ||100M || 20M|| 30mins||  15mins ||
- ||50M ||100M  ||42mins||28mins
+ ||50M || 100M|| 42mins||28mins||   
- ||50M||   100M||  41mins, 56sec|| 28mins||
+ ||50M || 100M|| 41mins, 56sec||   28mins||   
- ||50M ||100M||42mins||28mins  
+ ||50M || 100M|| 42mins||  28mins  ||   
- ||100M||100M  ||74mins||  44mins||
+ ||100M ||100M|| 74mins||  44mins||   
  
  
  == Phase 2 ==


[Pig Wiki] Update of "PigMergeJoin" by AshutoshChauhan

2009-08-20 Thread Apache Wiki
Dear Wiki user,

You have subscribed to a wiki page or wiki category on "Pig Wiki" for change 
notification.

The following page has been changed by AshutoshChauhan:
http://wiki.apache.org/pig/PigMergeJoin

--
  inputs to a merge join.  If users wish to do three plus way joins with this 
algorithm they can decompose their joins into a series of two ways joins.
  
  
+ == Performance Benchmark ==
+ ||# of rows || # of rows || Sym-Hash Join || Merge-Join || Skew-Join || 
Remarks ||
+ ||100K||  100K||  1m30.588s||  0m41.764s||
+ ||5M||5M|| 1m5.579s|| 0m51.535s|| 
+ |15M||15M||   2m55.812s|| 2m16.906s|| 
+ ||15M||   15M||   5m21.030s|| 4m12.125s|| 
+ ||20M||   20M||10m30.819s||5m42.302s||
+ ||20M ||20M   ||12m25.878s ||5m42.302s||  
+ ||20M||   20M || 15m56.533s||  7m57.646s||
+ ||20M||   20M||   .   || 17m16.542s   ||112m20.478s
+ ||100M||20M|| 30mins||15mins||
+ ||50M ||100M  ||42mins||28mins
+ ||50M||   100M||  41mins, 56sec|| 28mins||
+ ||50M ||100M||42mins||28mins  
+ ||100M||100M  ||74mins||  44mins||
+ 
+ 
  == Phase 2 ==
  
  Phase 1 which got committed in r804310 has few limitations. Those limitations 
are enumerated below with possible solutions:


[Pig Wiki] Update of "PigMergeJoin" by AshutoshChauhan

2009-08-20 Thread Apache Wiki
Dear Wiki user,

You have subscribed to a wiki page or wiki category on "Pig Wiki" for change 
notification.

The following page has been changed by AshutoshChauhan:
http://wiki.apache.org/pig/PigMergeJoin

--
  
  == Problem Statement ==
  Often users' data is stored such that both inputs are already totally sorted 
on the join key.  In this case, it is possible to join the data
- in the map phase of a map reduce job.  This will provide a significant 
performance speed up compared to passing all of the data through uneeded sort 
and shuffle phases.
+ in the map phase of a map reduce job.  This will provide a significant 
performance speed up compared to passing all of the data through unneeded sort 
and shuffle phases.
  
  == Proposed Solution ==
  Pig will implement a merge join algorithm (or sort-merge join, although in 
this case the sort is already assumed to have been done).  As with other join 
algorithm
@@ -15, +15 @@

  }}}
  
  Pig will implement this algorithm by selecting the left input of the join to 
be the input file for the map phase, and the right input of the join to be the 
side file.
- It will then sample records
- from the right input to build an index that that contains, for each sampled 
record, the key and the offset into the file the record begins
+ It will then sample records from the right input to build an index that that 
contains, for each sampled record, the key(s) the filename and the offset into 
the file the record begins
  at.  This sampling will be done in an initial map only job.  A second MR job 
will then be initiated, with the left input as its input.  Each map will use 
the index to 
   seek to the appropriate record in the right input and begin doing the join.
  
@@ -34, +33 @@

  
  == Implementation Details ==
  === Logical Plan ===
- In the logical plan, use of this join will be recorded in !LOJoin (similar to 
the way fragment-replicate join and skew join are).  (The work to convert FR 
Join and Skew
+ In the logical plan, use of this join will be recorded in !LOJoin (similar to 
the way fragment-replicate join and skew join are).  
- join to use a common LOJoin is not yet done; we shold coordinate work on this 
join with the work on the skew join to avoid duplicating effort.)
- 
  === Physical Plan ===
  In the physical plan a !POMergeJoin operator will be created.  It will 
contain the logic to implement the join.  The logic will be:
  
@@ -61, +58 @@

  === Map Reduce Plan ===
  
  The MR compiler will introduce a sampling MR job before the MR job that 
contains the !POMergeJoin.  (The sampling algorithm is described below.)
- This sampling job can read as input the output of the previous map
+ This sampling job can read as input the output of the previous map 
  reduce job (or if there is no previous map reduce job the initial input file) 
even if there are physical operators before the !POMergeJoin in the current MR 
job.  That
- is, there is no need to create a MR boundary immediately before the sampling 
as there is with order by or skew join.  For example:
+ is, no MR boundary is created immediately before the sampling as there is 
with order by or skew join.  For example:
  
  {{{
  A = load 'input1';
@@ -77, +74 @@

  {{{
  Job 1:
  Map: JoinSampleLoader 
- Reduce:
+ Reduce: Sort the index.
  
  Job 2:
  Map: filter->join
@@ -87, +84 @@

  The reason for this difference is that the key location in the file is not 
affected by the filter, and thus the sample need not be taken after the filter 
whereas in the
  skew join and order by cases the skew of the key may be affected by the 
filter.
  
- The sampling algorithm will need to record the key and the offset into the 
input file that the record begins at.  This can be done by subclassing 
!RandomSampleLoader to
+ The sampling algorithm will need to record the key, filename and the offset 
into the input file that the record begins at.  This is done by 
MergeJoinIndexer to
+ create a sampler that extract the keys from tuple and append filename and 
offset.  
- create a sampler that appends the offset information to the tuple.  This will 
avoid recreating the sampling algorithm, and allow the sampler to benefit from 
planned
- enhancements of !RandomSampleLoader.
  
  How many records per block to sample (thus how large to make the index) is 
not clear.  Initially we
  should have it sample one record per block.  We can then experiment to 
understand the space and performance trade offs of increasing the number of 
records sampled per
@@ -105, +101 @@

  implementation !POMergeJoin does not know how far past the end of its input 
to keep accepting non-matching keys on the right side.  It will need to know 
what key the next
  block of the left input starts on in order to determine when it should stop 
reading keys from the right input.  A sampling pass on the left input that 
reads th

[Pig Wiki] Update of "PigMergeJoin" by AshutoshChauhan

2009-08-14 Thread Apache Wiki
Dear Wiki user,

You have subscribed to a wiki page or wiki category on "Pig Wiki" for change 
notification.

The following page has been changed by AshutoshChauhan:
http://wiki.apache.org/pig/PigMergeJoin

--
  
  getNext() in POMergeJoin should be updated so that it doesn't make this 
assumption.
  
-  
+ ''' Performance ''' : Currently POMergeJoin buffers all the tuples belonging 
to same key on left side before looking at right key. If keys don't match it 
throws away that buffer and moves on. Better way is to first look at both keys 
and then determine if there is any need of doing buffering. This will not save 
on CPU time because amount of work is nearly equal in either case, though it 
may affect memory footprint because in case of skewed key which don't match we 
will unnecessarily be consuming memory.   
  


[Pig Wiki] Update of "PigMergeJoin" by AshutoshChauhan

2009-08-14 Thread Apache Wiki
Dear Wiki user,

You have subscribed to a wiki page or wiki category on "Pig Wiki" for change 
notification.

The following page has been changed by AshutoshChauhan:
http://wiki.apache.org/pig/PigMergeJoin

--
  
  == Multiway Join ==
  This algorithm could theoretically be extended to support joins of three or 
more inputs.  For now it will not be.  Pig will give an error if users give 
more than two
- inputs to a merge join.  If users wish to do three plus way joins with this 
algorithm they can decompose their joins into a series of two ways joins.  
+ inputs to a merge join.  If users wish to do three plus way joins with this 
algorithm they can decompose their joins into a series of two ways joins.
  
+ 
+ == Phase 2 ==
+ 
+ Phase 1 which got committed in r804310 has few limitations. Those limitations 
are enumerated below with possible solutions:
+ 
+ '''Predecessors''' :  Only filter and foreach are currently allowed as 
predecessor of Merge Join.
+ 
+ MRCompiler maintains state while compiling physical operators. One of them is 
list of MR jobs which are already created. These MR jobs contain pipeline of 
physical operator which have already gotten compiled. In case of MergeJoin 
there are atleast two MR jobs which would have gotten created by the time 
POMergeJoin is visited. Now POMergeJoin needs to identify which of these MR job 
corresponds to left input and which one corresponds to right. It does so by 
matching its predecessor physical operators in the physical plan with the 
physical operators which are there in compiled MR jobs. But this is not a 
reliable way. Confusion arises specially when preceding physical operator 
generated more then one MR job (e.g. in case of order-by). To make Merge Join 
work in these scenario we need a reliable way of knowing which physical 
operator belongs to which MR job. A proposal to fix this is to introduce 
PhyOpToMROp map in spirit of LogToPhyMap. More details at: 
https://issues.apache.or
 g/jira/browse/PIG-858 
+ 
+ '''Sort order''' : Data must be sorted in ascending order.
+ 
+ In POMergeJoin comparison of keys should be done by comparator which can be 
set based on user input.
+ 
+ ''' End-of-All-Input ''' : POMergeJoin needs to know when it is called last 
time. It does so by checking end of all input flag. Problem is it assumes that 
when this flag is true that pipeline is running without any input and with 
status EOP. This holds in all cases except for the case when one of the 
predecessor of merge join is streaming. Streaming also makes use of 
end-of-all-input flag and can potentially generate one or more tuples when this 
flag is set.
+ 
+ getNext() in POMergeJoin should be updated so that it doesn't make this 
assumption.
+ 
+  
+