Dear Wiki user, You have subscribed to a wiki page or wiki category on "Pig Wiki" for change notification.
The following page has been changed by AshutoshChauhan: http://wiki.apache.org/pig/PigMergeJoin ------------------------------------------------------------------------------ == Problem Statement == Often users' data is stored such that both inputs are already totally sorted on the join key. In this case, it is possible to join the data - in the map phase of a map reduce job. This will provide a significant performance speed up compared to passing all of the data through uneeded sort and shuffle phases. + in the map phase of a map reduce job. This will provide a significant performance speed up compared to passing all of the data through unneeded sort and shuffle phases. == Proposed Solution == Pig will implement a merge join algorithm (or sort-merge join, although in this case the sort is already assumed to have been done). As with other join algorithm @@ -15, +15 @@ }}} Pig will implement this algorithm by selecting the left input of the join to be the input file for the map phase, and the right input of the join to be the side file. - It will then sample records - from the right input to build an index that that contains, for each sampled record, the key and the offset into the file the record begins + It will then sample records from the right input to build an index that that contains, for each sampled record, the key(s) the filename and the offset into the file the record begins at. This sampling will be done in an initial map only job. A second MR job will then be initiated, with the left input as its input. Each map will use the index to seek to the appropriate record in the right input and begin doing the join. @@ -34, +33 @@ == Implementation Details == === Logical Plan === - In the logical plan, use of this join will be recorded in !LOJoin (similar to the way fragment-replicate join and skew join are). (The work to convert FR Join and Skew + In the logical plan, use of this join will be recorded in !LOJoin (similar to the way fragment-replicate join and skew join are). - join to use a common LOJoin is not yet done; we shold coordinate work on this join with the work on the skew join to avoid duplicating effort.) - === Physical Plan === In the physical plan a !POMergeJoin operator will be created. It will contain the logic to implement the join. The logic will be: @@ -61, +58 @@ === Map Reduce Plan === The MR compiler will introduce a sampling MR job before the MR job that contains the !POMergeJoin. (The sampling algorithm is described below.) - This sampling job can read as input the output of the previous map + This sampling job can read as input the output of the previous map reduce job (or if there is no previous map reduce job the initial input file) even if there are physical operators before the !POMergeJoin in the current MR job. That - is, there is no need to create a MR boundary immediately before the sampling as there is with order by or skew join. For example: + is, no MR boundary is created immediately before the sampling as there is with order by or skew join. For example: {{{ A = load 'input1'; @@ -77, +74 @@ {{{ Job 1: Map: JoinSampleLoader - Reduce: + Reduce: Sort the index. Job 2: Map: filter->join @@ -87, +84 @@ The reason for this difference is that the key location in the file is not affected by the filter, and thus the sample need not be taken after the filter whereas in the skew join and order by cases the skew of the key may be affected by the filter. - The sampling algorithm will need to record the key and the offset into the input file that the record begins at. This can be done by subclassing !RandomSampleLoader to + The sampling algorithm will need to record the key, filename and the offset into the input file that the record begins at. This is done by MergeJoinIndexer to + create a sampler that extract the keys from tuple and append filename and offset. - create a sampler that appends the offset information to the tuple. This will avoid recreating the sampling algorithm, and allow the sampler to benefit from planned - enhancements of !RandomSampleLoader. How many records per block to sample (thus how large to make the index) is not clear. Initially we should have it sample one record per block. We can then experiment to understand the space and performance trade offs of increasing the number of records sampled per @@ -105, +101 @@ implementation !POMergeJoin does not know how far past the end of its input to keep accepting non-matching keys on the right side. It will need to know what key the next block of the left input starts on in order to determine when it should stop reading keys from the right input. A sampling pass on the left input that reads the first key of each block could provide this information. + + In current implementation (r806281) only inner joins are supported. == Multiway Join == This algorithm could theoretically be extended to support joins of three or more inputs. For now it will not be. Pig will give an error if users give more than two
