[
https://issues.apache.org/jira/browse/PIG-161?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=12605422#action_12605422
]
Alan Gates commented on PIG-161:
--------------------------------
As part of the pipeline rework, we have discovered some issues in the original
design regarding how we handle foreach/generate. In particular this has
been an issue for how nested plans will work. The original design looked like
this:
{noformat}
Foreach {
Plan nestedPlan; // one plan for the whole nested logic. All the
operations in this plan would be relational operators.
}
Generate {
Plan[] nestedPlans; // one plan for each element that would be generated.
All operators in this plan would be expression operators.
}
{noformat}
The runtime logic of foreach.getNext() was:
{noformat}
input = predecessor.getNext();
nestedPlan.attach(input);
rc = nestedPlan.getNext();
return rc;
{noformat}
and the runtime logic of generate.getNext() was:
{noformat}
rc = new tuple
for (np : nestedPlans) {
tuple[i] = np.getNext();
}
handle any flattening;
return rc;
{noformat}
This led to a couple of issues.
# Nested plans which are DAGs and not trees (which are quite common) are hard
to handle.
# Generate was not properly running all tuples through the nested plan before
passing on the input to its attached plans. This led to aggregate functions
getting only the first tuple in a bag instead of all the tuples. Generate
could not really be changed to address this as it is not clear to it when it
does and doesn't need to keep pulling for tuples.
To address this issue we propose the following design.
{noformat}
Foreach {
Plan[] nestedPlans; // one plan for each element in the projection. May be
relational or expression operators.
}
Accumulate {
Plan nestedPlan; // Consists entirely of expression operators.
}
{noformat}
In the case where the nested plan contains a realtional operator (which means
there was an actual nested section in the script) then a new relational
operator, accumulate, will be added to the end of the plan. It's task will be
to accumulate tuples from the nested pipeline, construct a bag, and
attach that bag as the input to any expression operators given as part of the
generate projection.
So the runtime logic of foreach.getNext() will now be:
{noformat}
input = predecessor.getNext();
rc = new tuple;
for (np : nestPlans) {
np.attach(input);
tuple[i] = np.getNext();
}
handle any flattening;
return rc;
{noformat}
and the rutime logic of accumulate.getNext() will be:
{noformat}
input = predecessor.getNext();
nestedPlan.attach(input);
return nestedPlan.getNext();
{noformat}
For clarity, let us consider a couple of use cases.
Case 1: no nested section in the script.
{noformat}
A = load 'myfile';
B = group A by $0;
C = foreach B generate group, COUNT($1), SUM($1.$1);
{noformat}
The plans for this will be:
Top level plan:
load -> group -> foreach
The foreach will have three nested plans:
plan 1: project(0)
plan 2: project(1) -> COUNT()
plan 3: project(1) -> project(1) -> SUM()
So for each tuple it gets, foreach will attach it to each of the above three
plans, and then call getNext() on the leaves of those plans. It will take
the resulting three values and construct a tuple and return that as the result
of its getNext().
One thing to note here is in plan 3, project is being used in different ways.
The first project is being handed a tuple, and asked to output
a single field (which happens to be a bag). The second project is being handed
a bag and asked to output a bag. The schema of its input bag is
{(int, int)} and the schema of its output bag is {(int)}. So project needs to
know when it is being used to output a bag vs when it is being asked to
output a field from a tuple, which may be of any type.
Case 2: nested section in the script
{noformat}
A = load 'myfile';
B = group A by $0;
C = foreach B {
C1 = distinct $1;
C2 = filter $1 by $1 > 0;
generate group, COUNT(C1), SUM(C2.$1);
};
{noformat}
The plans for this will be:
Top level plan:
load -> group -> foreach
The foreach will have three nested plans:
plan 1: project(0)
plan 2: project(1) -> distinct -> accumulate
The accumulate will have a nested plan of: project( * ) -> COUNT()
plan 3: project(1) -> filter -> accumulate
The accumulate will have a nested plan of : project(1) -> SUM()
So effectively, we're proposing several changes:
# Removal of generate as a relational operator. Its functionality will be
absorbed into foreach.
# Splitting of nested plans, one for each generate. This sacrifices some
optimizations. Consider for example if in the script above it had been C2 =
filter C1 by $1 > 0. Ideally we would only evaluate distinct once. Under this
proposal we would evaluate it twice. For now that is ok because evaluating it
once and then splitting output is much more difficult.
# Creation of "bookend" operators. The project will facilitate transition from
expression operators to relational operators at the top of these nested plans
by taking a bag attached as input and streaming out the tuples one at a time
(this functionality is already in place, built for the original generate
implementation). The accumulate will facilitate transitions from realtions to
an expression operator. One oddity here will be that it will be the only
relational operator that can return a type other than bag. It may return any
type.
This proposal will entail the following changes:
On the physical side:
# Change project.getNext(Bag) to handle the case where it's given a bag. In
this case it should return a bag, stripping the bag to contain only the
field(s) being projected.
# Change foreach to handle functionality previously in generate, including the
flattening logic.
# Create an accumulate operator.
On the logical side:
# Changes to project.getSchema() to figure out when project needs to return a
bag vs when it needs to return any type
# Changes to parsing of foreach to decide when an accumulate is necessary in
the plan, and when it isn't.
# Changes to foreach.getSchema() to take on much of the previous functionality
of generate.getSchema().
# These changes will most likely force changes in the type checker as well.
> Rework physical plan
> --------------------
>
> Key: PIG-161
> URL: https://issues.apache.org/jira/browse/PIG-161
> Project: Pig
> Issue Type: Sub-task
> Reporter: Alan Gates
> Assignee: Alan Gates
> Attachments: arithmeticOperators.patch, BinCondAndNegative.patch,
> CastAndMapLookUp.patch, incr2.patch, incr3.patch, incr4.patch, incr5.patch,
> logToPhyTranslator.patch, missingOps.patch,
> MRCompilerTests_PlansAndOutputs.txt, Phy_AbsClass.patch, physicalOps.patch,
> physicalOps.patch, physicalOps.patch, physicalOps.patch,
> physicalOps_latest.patch, POCast.patch, POCast.patch, podistinct.patch,
> pogenerate.patch, pogenerate.patch, pogenerate.patch, posort.patch,
> POUserFuncCorrection.patch,
> TEST-org.apache.pig.test.TestLocalJobSubmission.txt,
> TEST-org.apache.pig.test.TestLogToPhyCompiler.txt,
> TEST-org.apache.pig.test.TestLogToPhyCompiler.txt,
> TEST-org.apache.pig.test.TestMapReduce.txt,
> TEST-org.apache.pig.test.TestTypeCheckingValidator.txt,
> TEST-org.apache.pig.test.TestUnion.txt, translator.patch, translator.patch,
> translator.patch, translator.patch
>
>
> This bug tracks work to rework all of the physical operators as described in
> http://wiki.apache.org/pig/PigTypesFunctionalSpec
--
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.