In the current version, the combiner is not used with cogroup. With the
pipeline rework going on in the types branch, the combiner will be used
for cogroups like:
C = cogroup A, B;
D = foreach C generate project, algebraic, algebraic, ...
where project is a non-UDF expression that projects fields from C and
algebraic represents an algebraic UDF on one of the fields of C.
Projections that are flattened will not be combined, because all the
records are necessary to properly materialize the cross product. So
that means the optimization proposed in pig 350 won't interact with the
combiner.
As far as cross and the combiner, we don't yet have a combiner algorithm
for optimizing cross. This is doable but complicated. Are you
currently using cross? We had not focussed on this as an optimization
area because we were not aware of people who used it.
You mention using the combiner with filters. Were you wanting us to
catch cases like:
B = group A;
C = filter B by $0 > 5;
D = foreach C generate group, COUNT(A);
and push both the filter and the foreach into the combiner? That is
possible, but we have put that off in favor of instead pushing the
filter above the group. (We don't do this filter pushing yet, but work
is on going to develop an optimizer that will do these kinds of
optimizations.) The only case we could think of where you wouldn't want
to push the filter (and we won't) is when the filter involves a udf
which might be very expensive to call so you want to wait until after
the data is grouped to minimize the number of calls to the UDF.
Alan.
Mridul Muralidharan wrote:
This would be absolutely great !
Btw, hope this continues to work fine with combiners in case of
COGROUP + FILTER (combiners are applicable in this case right ? or
only for group ?).
Additionally, what would the impact of this be on CROSS + FILTER ? (I
am assuming that CROSS + FILTER is not combinable currently)
Thanks,
Mridul
Alan Gates (JIRA) wrote:
Join optimization for pipeline rework
-------------------------------------
Key: PIG-350
URL: https://issues.apache.org/jira/browse/PIG-350
Project: Pig
Issue Type: Bug
Components: impl
Affects Versions: types_branch
Reporter: Alan Gates
Assignee: Daniel Dai
Priority: Critical
Fix For: types_branch
Currently, joins in pig are done as groupings where each input is
grouped on the join key. In the reduce phase, records from each
input are collected into a bag for each key, and then a cross product
done on these bags. This can be optimized by selecting one
(hopefully the largest) input and streaming through it rather than
placing the results in a bag. This will result in better memory
usage, less spills to disk due to bag overflow, and better
performance. Ideally, the system would intelligently select which
input to stream, based on a histogram of value distributions for the
keys. Pig does not have that kind of metadata. So for now it is
best to always pick the same input (first or last) so that the user
can select which input to stream.
Similarly, order by in pig is done in this same way, with the
grouping keys being the ordering keys, and only one input. In this
case pig still currently collects all the records for a key into a
bag, and then flattens the bag. This is a total waste, and in some
cases causes significant performance degradation. The same
optimization listed above can address this case, where the last bag
(in this case the only bag) is streamed rather than collected.
To do these operations, a new POJoinPackage will be needed. It will
replace POPackage and the following POForEach in these types of
scripts, handling pulling the records from hadoop and streaming them
into the pig pipeline. A visitor will need to be added in the map
reduce compilation phase that detects this case and combines the
POPackage with POForeach into this new POJoinPackage.