Join optimization for pipeline rework
-------------------------------------
Key: PIG-350
URL: https://issues.apache.org/jira/browse/PIG-350
Project: Pig
Issue Type: Bug
Components: impl
Affects Versions: types_branch
Reporter: Alan Gates
Assignee: Daniel Dai
Priority: Critical
Fix For: types_branch
Currently, joins in pig are done as groupings where each input is grouped on
the join key. In the reduce phase, records from each input are collected into
a bag for each key, and then a cross product done on these bags. This can be
optimized by selecting one (hopefully the largest) input and streaming through
it rather than placing the results in a bag. This will result in better memory
usage, less spills to disk due to bag overflow, and better performance.
Ideally, the system would intelligently select which input to stream, based on
a histogram of value distributions for the keys. Pig does not have that kind
of metadata. So for now it is best to always pick the same input (first or
last) so that the user can select which input to stream.
Similarly, order by in pig is done in this same way, with the grouping keys
being the ordering keys, and only one input. In this case pig still currently
collects all the records for a key into a bag, and then flattens the bag. This
is a total waste, and in some cases causes significant performance degradation.
The same optimization listed above can address this case, where the last bag
(in this case the only bag) is streamed rather than collected.
To do these operations, a new POJoinPackage will be needed. It will replace
POPackage and the following POForEach in these types of scripts, handling
pulling the records from hadoop and streaming them into the pig pipeline. A
visitor will need to be added in the map reduce compilation phase that detects
this case and combines the POPackage with POForeach into this new POJoinPackage.
--
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.