Dear Wiki user,

You have subscribed to a wiki page or wiki category on "Pig Wiki" for change 
notification.

The "PigLogicalPlanOptimizerRewrite" page has been changed by AlanGates.
http://wiki.apache.org/pig/PigLogicalPlanOptimizerRewrite

--------------------------------------------------

New page:
== Problem Statement ==
The current implementation of the logical plan and the logical optimizer in Pig 
has proven to not be easily extensible.  Developer feedback has indicated that 
adding new
rules to the optimizer is quite burdensome.  In addition, the logical plan has 
been an area of numerous bugs, many of which have been difficult to fix.  
Developers also feel
that the logical plan is difficult to understand and maintain.  The root cause 
for these issues is that a number of design decisions that were made as part of 
the 0.2 rewrite of the front end have now proven to be
sub-optimal.  The heart of this proposal is to revisit a number of those 
proposals and rebuild the logical plan with a simpler design that will make it 
much easier to
maintain the logical plan as well as extend the logical optimizer.

=== Issues that Need to be Addressed in this Rework ===
'''One:'''  !OperatorPlan has far too many operations.  It has 29 public 
methods.  This needs to be paired down to a minimal set of operators that are 
well defined.

'''Two:''' Currently, relational operators (Join, Sort, etc.) and expression 
operators (add, equals, etc.) are both !LogicalOperators.  Operators such as 
Cogroup that contain expressions have !OperatorPlans that contain these 
expressions.  This was done for two reasons:
 1. To make it easier for visitors to visit both types of operators (that is, 
visitors didn't have to have separate logic to handle expressions).
 1. To better handle the ambiguous nature of inner plans in Foreach.
However, it has led to visitors and graphs that are hard to understand.  Both 
of the above concerns can be handled while breaking this binding so that 
relational and expression operators are seaprate types.

'''Three:'''  Related to the issue of relational and expression operators 
sharing a type is that inner plans have connections to outer plans.  Take for 
example a script like

{{{
A = load 'file1' as (x, y);
B = load 'file2' as (u, v);
C = cogroup A by x, B by u
D = filter C by A.x > 0;
}}}
In this case the cogroup will have two inner plans, one of which will be a 
project of A.x and the other a project B.u.  The !LOProject objects 
representing these projections
will hold actual references to the !LOLoad operators for A and B.  This makes 
disconecting and rearranging nodes in the plan much more difficult.  Consider 
if the optimizer wants to
move the filter in D above C.  Now it has to not only change connections in the 
outer plan between load, cogroup, and filter; it also has to change connections 
in the
first inner plan of C, because this now needs to point to the !LOFilter for D 
rather than the !LOLoad for A.

'''Four:'''  The work done on Operator and !OperatorPlan to support the 
original rules for the optimizer had two main problems:
   1.. The set of primitives chosen were not the correct ones.
   1.. The operations chosen were put on the generic super classes (Operator) 
rather than further down on the specific classes that would know how to 
implement them.

'''Five:'''  At a number of points efforts were made to keep the logical plan 
close to the physical plan.  For example, !LOProject represents all of the same 
operations that
!POProject does.  While this is convenient in translation, it is not convenient 
when trying to optimize the plan.  The !LogicalPlan needs to focus on 
reprenting the logic of
the script in a way that is easy for semantic checkers (such as !TypeChecker) 
and the optimizer to work with.

'''Six:'''  The rule of one operation per operator was violated.  !LOProject 
handles three separate roles (converting from a relational to an expression 
operator, actually
projecting, and converting from an expression to a relational operator).  This 
makes coding much more complex for the optimizer because when it encounters an 
!LOProject it
must first determine which of these three roles it is playing before it can 
understand how to work with it.

The following proposal will address all of these issues.

== Proposed Methodology ==
Fixing these issues will require extensive changes, including a complete 
rewrite of Operator, !OperatorPlan, !PlanVisitor, !LogicalOperator, 
!LogicalPlan,
!LogicalPlanVisitor, every current subclass of !LogicalOperator, and all 
existing optimizer rules.  It will also require extensive changes, though not 
complete rewrites, in
existing subclasses of !LogicalTransformer.  To avoid destablizing the entire 
codebase during this operation, this will be done in a new set of packages as a 
totally separate
set of classes.  Linkage code will be written to translate the current 
!LogicalPlan to the new experimental !LogicalPlan class.  A new 
!LogicalToPhysicalTranslator will also
be written to translate this new !LogicalPlan to a !PhysicalPlan.  This code 
path will only be taken if some type of command line switch or property is set, 
thus insulating
current developers and users from this work.

This has the added advantage that it is easy to build a prototype first.  Given 
that our first implementation now needs rewriting, prototyping first will help 
us explore
whether we solved the problem correctly this time.

== The Actual Proposal ==

=== Changes to Plans ===
In general, the generic plan classes will be changing in a couple of important 
ways:

One, they will be made much simpler.  The goal will be to find a minimal set of 
operations that will enable all desired plan features.

Two, they will no longer be generics.  While nice in theory, this led to 
several observed issues.  One, each class had so many parameters that in 
practice developers have
worked around rather than used the features of the generics.  That is, 
parameterizing the classes seemed to get in developers way, rather than help 
them.  Two, since we
propose to break relational and expression operators into different types, it 
will no longer be possible for a single visitor to span both types.  But we do 
not wish to
prohibit this in all cases.


New Operator class.  Note that the funciton which was previously called `visit` 
has been renamed `accept` to avoid confusion with the `visit` method in 
!PlanVisitor.
{{{

package org.apache.pig.experimental.plan;

public abstract class Operator {
    
    protected String name;
    protected OperatorPlan plan; // plan that contains this operator

    public Operator(String n, OperatorPlan p) {...}

    /**
     * Accept a visitor at this node in the graph.
     * @param v Visitor to accept.
     */
    public abstract void accept(PlanVisitor v);

    public String getName() { ... }
    
    /**
     * Get the plan associated with this operator.
     * @return plan
     */
    public OperatorPlan getPlan() { ... }

}


}}}

New !OperatorPlan class.  Note the severe paring down of the number of 
operations.  Only simple add, remove, connect, disconnect.  All all operations 
are left to subclasses to implement 
what makes sense for their plans.

{{{

package org.apache.pig.experimental.plan;

public abstract class OperatorPlan {

    protected Set<Operator> ops;
    protected PlanEdge fromEdges;
    protected PlanEdge toEdges;

    public OperatorPlan() { ... }
    
    /**
         * Get number of nodes in the plan.
     */
    public int size() { ... }

    /**
     * Get all operators in the plan that have no predecessors.
     * @return all operators in the plan that have no predecessors, or
     * an empty list if the plan is empty.
     */
    public List<Operator> getRoots() { ... }

    /**
     * Get all operators in the plan that have no successors.
     * @return all operators in the plan that have no successors, or
     * an empty list if the plan is empty.
     */
    public List<Operator> getLeaves() { ... }

    /**
     * For a given operator, get all operators immediately before it in the
     * plan.
     * @param op operator to fetch predecessors of
     * @return list of all operators imeediately before op, or an empty list
     * if op is a root.
     * @throws IOException if op is not in the plan.
     */
    public List<Operator> getPredecessors(Operator op) throws IOException { ... 
}
    
    /**
     * For a given operator, get all operators immediately after it.
     * @param op operator to fetch successors of
     * @return list of all operators imeediately after op, or an empty list
     * if op is a leaf.
     * @throws IOException if op is not in the plan.
     */
    public List<Operator> getSuccessors(Operator op) throws IOException { ... }

    /**
     * Add a new operator to the plan.  It will not be connected to any
     * existing operators.
     * @param op operator to add
     */
    public void add(Operator op) { ... }

    /**
     * Remove an operator from the plan.
     * @param op Operator to be removed
     * @throws IOException if the remove operation attempts to 
     * remove an operator that is still connected to other operators.
     */
    public void remove(Operator op) throws IOException { ... }
        
    /**
         * Connect two operators in the plan, controlling which position in the
         * edge lists that the from and to edges are placed.
         * @param from Operator edge will come from
         * @param fromPos Position in the array for the from edge
         * @param to Operator edge will go to
         * @param toPos Position in the array for the to edge
     */
    public void connect(Operator from,
                        int fromPos,
                        Operator to,
                        int toPos) { ... }
    
    /**
         * Connect two operators in the plan.
         * @param from Operator edge will come from
         * @param to Operator edge will go to
     */
    public void connect(Operator from, Operator to) { ... }
    
    /**
     * Disconnect two operators in the plan.
     * @param from Operator edge is coming from
     * @param to Operator edge is going to
     * @return pair of positions, indicating the position in the from and
     * to arrays.
     * @throws IOException if the two operators aren't connected.
     */
    public Pair<Integer, Integer> disconnect(Operator from,
                                             Operator to) throws IOException { 
... }

}

}}}

There are no significant changes to !PlanVisitor and !PlanWalker other than the 
removal of generics.  With the change that only LOForeach now has inner plans, 
it isn't clear whether the
`pushWalker` and `popWalker` methods in !PlanVisitor are really useful anymore 
or not.  These may be removed.

== Changes to Logical Operators ==
There will be a number of important changes to !LogicalOperators.

First, as mentioned above relational opertors will be split into two disparate 
groups, relational and expression.

Second, inner plans and expression plans will no longer hold any explicit 
references to outer plans.  At most, they will reference the operator which 
contains the inner plan.

Third, operators will represent exactly one operation.

Fourth, only LOForeach will have inner plans.  All other relational operators 
will
only have expressions.

Fifth, a new operator, called for now Identity but desparately seeking a better 
name,
will be introduced.  The sole purpose of this operator will be to act as a root
in foreach's inner plans.  So, given a script like:

{{{
A = load 'input';
B = group A by $0;
C = foreach B {
    C1 = B.$1;
    C2 = distinct C1;
    generate group, COUNT(C2);
}
}}}

the foreach's inner plan will have two Identity operators, once for $0 and one 
for $1.
This will allow the C1 !LOGenerate operator to connect to another logical 
operator.

Sixth, in the past expression operators were used at places in inner plans, 
such as
the `C1 = B1.$0;` above.  Relational operators will always be used in these 
places
now.  LOGenerate (or perhaps a new operator if necessary) will be used in the 
place of
these assignment operations instead.

Seventh, in the past !LOForeach had multiple inner plans, one for each of its 
outputs.
That will no longer be the case.  !LOForeach will always have exactly one inner 
plan,
which must terminate with a !LOGenerate.  That !LOGenerate will have 
expressions for
each of its outputs.

All logical operators will have a schema.  This schema represents the format of 
the
output for that operator.  The schema can be null, which indicates that the 
format of
the output for that operator is unknown.  In general the notion of unknownness 
in a
schema will be contagious.  Take for example:

{{{
A = load 'file1' as (x: int, y: float);
B = load 'file2';
C = cogroup A by x, B by $0;
D = foreach C generate flatten(A), flatten(B);
}}}

A will have a schema, since one is specified for it.  B will not have a schema, 
since
one is not specified.  C will have a schema, because the schema of (co)group is 
always
known.  Note however that in C's schema, the bag A will have a schema, and the 
bag B
will not.  This means that D will not have schema, because the output of 
flatten(B) is
not known.  If D is changed to be `D = foreach C generate flatten(A);` then D 
will
have a schema, since the format of flatten(A) is known.

!LogicalPlan will contain `add` and `removeLogical` operations specificly 
designed for manipulating logical plans.  These will be the only operations 
supported on the plan.

{{{

package org.apache.pig.experimental.logical.relational;

/**
 * LogicalPlan is the logical view of relational operations Pig will execute 
 * for a given script.  Note that it contains only realtional operations.
 * All expressions will be contained in LogicalExpressionPlans inside
 * each relational operator.  LogicalPlan provides operations for
 * removing and adding LogicalRelationalOperators.  These will handle doing
 * all of the necessary add, remove, connect, and disconnect calls in
 * OperatorPlan.  They will not handle patching up individual relational
 * operators.  That will be handle by the various Patchers.
 *
 */
public class LogicalPlan extends OperatorPlan {
    
    /**
     * Add a relational operation to the plan.
     * @param before operator that will be before the new operator.  This
     * operator should already be in the plan.  If before is null then
     * the new operator will be a root.
     * @param newOper new operator to add.  This operator should not already
     * be in the plan.
     * @param after operator  that will be after the new operator.  This
     * operator should already be in the plan.  If after is null, then the
     * new operator will be a root.
     * @throws IOException if add is already in the plan, or before or after
     * are not in the plan.
     */
    public void add(LogicalRelationalOperator before,
                    LogicalRelationalOperator newOper,
                    LogicalRelationalOperator after) throws IOException { ... }
   
    /**
     * Add a relational operation with multiple outputs to the plan.
     * @param before operators that will be before the new operator.  These
     * operator should already be in the plan.
     * @param newOper new operator to add.  This operator should not already
     * be in the plan.
     * @param after operator  that will be after the new operator.  This
     * operator should already be in the plan.  If after is null, then the
     * new operator will be a root.
     * @throws IOException if add is already in the plan, or before or after
     * are not in the plan.
     */
    public void add(List<LogicalRelationalOperator> before,
                    LogicalRelationalOperator newOper,
                    LogicalRelationalOperator after) throws IOException { ... }
    
    /**
     * Add a relational operation with multiple inputs to the plan.
     * @param before operator that will be before the new operator.  This
     * operator should already be in the plan.  If before is null then
     * the new operator will be a root.
     * @param newOper new operator to add.  This operator should not already
     * be in the plan.
     * @param after operators that will be after the new operator.  These
     * operator should already be in the plan.
     * @throws IOException if add is already in the plan, or before or after
     * are not in the plan.
     */
    public void add(LogicalRelationalOperator before,
                    LogicalRelationalOperator newOper,
                    List<LogicalRelationalOperator> after) throws IOException { 
... }
    
    /**
     * Add a relational operation to the plan when the caller wants to control
     * how the nodes are connected in the graph.
     * @param before operator that will be before the new operator.  This
     * operator should already be in the plan.  before should not be null.
     * the new operator will be a root.
     * @param beforeToPos Position in before's edges to connect newOper at.
     * @param beforeFromPos Position in newOps's edges to connect before at.
     * @param newOper new operator to add.  This operator should not already
     * be in the plan.
     * @param afterToPos Position in after's edges to connect newOper at.
     * @param afterFromPos Position in newOps's edges to connect after at.
     * @param after operator  that will be after the new operator.  This
     * operator should already be in the plan.  If after is null, then the
     * new operator will be a root.
     * @throws IOException if add is already in the plan, or before or after
     * are not in the plan.
     */
    public void add(LogicalRelationalOperator before,
                    int beforeToPos,
                    int beforeFromPos,
                    LogicalRelationalOperator newOper,
                    int afterToPos,
                    int afterFromPos,
                    LogicalRelationalOperator after) throws IOException { ... }
    
    /**
     * Remove an operator from the logical plan.  This call will take care
     * of disconnecting the operator, connecting the predecessor(s) and 
     * successor(s) and patching up the plan. 
     * @param op operator to be removed.
     * @throws IOException If the operator is not in the plan.
     */
    public void removeLogical(LogicalRelationalOperator op) throws IOException 
{ ... }
        
}


}}}

A !LogicalRelationalOperator will be the logical representation of a relational 
operator (join, sort, etc.).

{{{

package org.apache.pig.experimental.logical.relational;

/**
 * Logical representation of relational operators.  Relational operators have
 * a schema.
 */
abstract public class LogicalRelationalOperator extends Operator {
    
    protected LogicalSchema schema;
    protected int requestedParallelism;
    protected String alias;
    protected int lineNum;

    /**
     * 
     * @param name of this operator
     * @param plan this operator is in
     */
    public LogicalRelationalOperator(String name, OperatorPlan plan) { ... }
    
    /**
     * 
     * @param name of this operator
     * @param plan this operator is in
     * @param rp requested parallelism
     */
    public LogicalRelationalOperator(String name,
                                     OperatorPlan plan,
                                     int rp) { ... }
    
    /**
     * Get the schema for the output of this relational operator.  This does
     * not merely return the schema variable.  If schema is not yet set, this
     * will attempt to construct it.  Therefore it is abstract since each
     * operator will need to construct its schema differently.
     * @return the schema
     */
    abstract public LogicalSchema getSchema();
    
    /**
     * Get the requestedParallelism for this operator.
     * @return requestedParallelsim
     */
    public int getRequestedParallelism() { ... }
    
    /**
     * Get the alias of this operator.  That is, if the Pig Latin for this 
operator
     * was 'X = sort W by $0' then the alias will be X.  For store and split it 
will
     * be the alias being stored or split.  Note that because of this this alias
     * is not guaranteed to be unique to a single operator.
     * @return alias
     */
    public String getAlias() { ... }
    
    /**
     * Get the line number in the submitted Pig Latin script where this operator
     * occurred.
     * @return line number
     */
    public int getLineNumber() { ... }

}

}}}

!LogicalSchema will be based on the existing Schema class.  It is hoped that 
this class can be greatly simplified.

!LogicalExpressionPlan will extend !OperatorPlan and contain 
!LogicalExpressionOperators.  Often expression trees are built with the 
expressions themselves containing references to the
next expression in the tree.  For example, a common implementation would be 
something like:

{{{
abstract class BinaryExpression {
    Expression leftHandSide;
    Expression rightHandSide;
}

class Plus extends BinaryExpression {
...
}

}}}

Since we already have plan structure we will have a !LogicalExpressionPlan.  
This has the advantage that !PlanVisitors will work with expression trees and 
we do not need to invent a separate
visitor hierarchy. What operations !LogicalExpressionPlan will need is not yet 
clear.

!LogicalExpressionOperators will have a data type (the type they return) and a 
unique identifier (uid).  The point of the uid is to allow the optimizer to 
track how expressions flow through
the tree.  So projection expressions will have the same uid as the expression 
they are projecting.  All other expressions will create a new uid, since they 
are changing the value of the
expression.

Consider the following example:

{{{
A = load 'file1' as (x:int, y:int);
B = filter A by x > 0 and y > 0;
C = foreach A generate x + y;
}}}

In this case x and y will be assigned uids in load, where they first enter the 
script.
The output of filter will maintain these same uids since filter does not alter 
the
format of its input.  But the output of foreach will have a different uid, 
since this
creates a new value in the script.

Hopefully an example will make all of this somewhat clearer.  Consider the 
following
script:

{{{
A = load 'input1' as (x: int, y: chararray);
B = load 'input2' as (u: int, v: float);
C = filter A by x is not null;
D = filter B by u is not null and v > 0.0;
E = join C on x, D on u;
F = group E on x;
G = foreach F {
    H = E.y;
    I = distinct H;
    J = order E by v;
    generate group, COUNT(I), CUMULATIVE(J);
}
store G into 'output';
}}}

That script will produce a logical plan that looks like the following:

{{attachment:logicalplan.jpg}}

The !LOFilter for D will have an expression plan that looks like:

{{attachment:expressiontree.jpg}}

== Changes to the Optimizer ==
The following changes will be made to the optimizer:
 1. Currently all rules are handed to the optimizer at once, and it iterates 
over them until none of the rules trigger or it reaches the maximum number of 
iterations.  This will be changed so that rules are collected into sets.  The 
optimizer will then iterate over rules in each set until none of the rules 
trigger or it reaches the maximum number of iterations.  The reason for this 
change will be made clear below.
 1. Currently the plan itself has the knowledge of how to patch itself up after 
it is rearranged.  (For example, how to reconstruct schemas after a plan is 
changed.)  This will be changed so that instead the optimizer can register a 
number of listeners (aka observers) on the plan.  These listeners will then be 
invoked after each rule that modifies the plan.  In these way the plans 
themselves need not understand how to patch up changes made by an optimization 
rule.  Also as we expand the plans and they record more information, it is easy 
to add new listeners without having to interact with existing functionality.
 1. The Rule class will be merged with the existing !RuleMatcher class so that 
Rule takes on the functionality of matching.  This match routine will be 
written once in Rule and extensions of rule need not re-implement it.

The goal in the above changes is to radically simplify writing optimizer rules. 
 Consider a rule to push a filter above a join.

{{{
A = load 'file1' as (x, y);
B = load 'file2' as (u, v);
C = join A by x, B by u;
D = filter C by (y > 0 or v > 0) and x > 0 and u > 0 and y > v;
}}}

In the current design, to push this filter, a rule must know how to split 
filters, how to push the parts that pushable, and reconstruct the filters of 
the parts that are not.  In the new
proposal we can instead create three rules.  Rule 1 will only know how split 
filters.  Rule 2 will only know how to push them.  And Rule 3 will only know 
how to reconstitute them.  These rules
can then be placed in separate sets, so that they do not interfere with each 
other.  So in this example, after Rule 1 has run, the script will conceptually 
look like

{{{
A = load 'file1' as (x, y);
B = load 'file2' as (u, v);
C = join A by x, B by u;
D1 = filter C by (y > 0 or v > 0);
D2 = filter D1 by  x > 0;
D3 = filter D2 by u > 0;
D = filter D3 by y > v;
}}}

Since Rule 1 will be run repeatedly it need not manage entirely splitting the 
filter.  It can be written to simply split one and, allowing the next iteration 
to split any subsequents ands.

After several iterations of Rule 2, the script will look like:

{{{
A = load 'file1' as (x, y);
D2 = filter A by  x > 0;
B = load 'file2' as (u, v);
D3 = filter B by u > 0;
C = join D2 by x, D3 by u;
D1 = filter C by (y > 0 or v > 0);
D = filter D1 by y > v;
}}}

Again, Rule 2 does need to do this in one pass.  It can concentrate on pushing 
filters past the join one at a time.  (Most likely there will be a Rule 2.1 
that will handle swapping filters so that we can get D2 past D1 and then apply 
Rule 2 to push D2 before C.)

And finally, after Rule 3 has run, the script will look like:

{{{
A = load 'file1' as (x, y);
D2 = filter A by  x > 0;
B = load 'file2' as (u, v);
D3 = filter B by u > 0;
C = join D2 by x, D3 by u;
D = filter C by (y > 0 or v > 0) and y > v;
}}}

Writing each of these rules will be much simpler than writing one large rule 
that must handle all three cases.

After each of these rules modify the tree, listeners will be notified that the 
tree has changed.  The currently known listeners are one to reconstruct schemas 
based on the changes and one to
assign uids to expressions in the tree.

As part of the prototype we need to identify a set of test case rules that we 
believe represent operations that will need to be done on the plan.  Currently 
we have the following in this list:
 * Pushing filter past join
 * Pushing filter above foreach with flatten
 * Pushing filter above group
 * Pushing filter above sort
 * Pushing filter above split
 * Pushing filter above union
 * Pushing filter above cross
 * Pushing foreach with flatten below cogroup, join, cross, union
 * Combine limit and sort
 * Combine load with stream
 * Combine stream with store
 * Combine two filters
 * Combine two foreachs
 * Combine two joins

Reply via email to