Santhosh Srinivasan commented on PIG-697:

Problem: Find a sub-graph within a directed acyclic graph (DAG) aka pattern 

For optimization, a common process is to find patterns in a graph and rewire 
the graph to have an optimized version of the pattern. The problem of finding a 
sub-graph within a graph is the well known problem, sub-graph isomorphism, a NP 
complete problem. Within the context of PIG, the problem is recast as finding a 
sub DAG within a DAG.

The problem is divided into multiple sub-problems of representing the pattern 
(sub DAG) and finding the pattern within the DAG.

Representing the pattern.

The objective is to use the same optimizer framework to implement rule 
specification across the board, i.e., logical plans, physical plans and map 
reduce plans. In order to facilitate this, a new plan called RulePlan will be 
designed. The RulePlan will subclass the OperatorPlan and will be typed on 

A RuleNode will extend the Operator class and will be annoated with the 
following member variables:

   1. mNodeClass: A private member variable that is of type Class to denote the 
class of the node. E.g.: mNodeClass could be LOFilter, LOLoad, POFilter, etc.
   2. mNodeType: An enum that idenitifes a node to be a simple node, a multi 
node or a common node. A brief descriptions of the three kinds of nodes follow.
      * Simple node: A normal rule node in the rule plan
      * Multi node: A node that appears more than once in the rule plan
      * Common node: A node that is common to more than one path, i.e., has 
multiple incoming edges or multiple outgoing edges. 

Finding the pattern.

The existing RuleMatcher class uses dependency order or depth first order to 
traverse the graph. This ensures that a given node is not seen more than once. 
Currently, the RuleMatcher relies on the list of nodes (provided as input) and 
picks the first node in the list as the root of the pattern. With the rule plan 
approach, the algorithm will be modified to look for all the roots of the 

For each node in the matched path, the number of edges per node should match 
the number of edges for the corresponding node in the RulePlan. In addition, 
instead of looking for one edge from each node, the RuleMatcher will look for 
all edges from a given node in the RulePlan. The implication of this change, is 
that each RulePlan should be self contained, i.e., there cannot be any dangling 
edges out of the roots and into the intermediate and leaf nodes in the RulePlan.

When a set of matches is found, the following algorithm will compute the 

if there is a common node then
        for all matches m
                for all remaining matches r
                        if(common_nodes(m) == common_nodes(r)) then
                                put list of roots of each match into 
final_match list
                        end if
                end for
        end for
        all matches have been found
end if

> Proposed improvements to pig's optimizer
> ----------------------------------------
>                 Key: PIG-697
>                 URL: https://issues.apache.org/jira/browse/PIG-697
>             Project: Pig
>          Issue Type: Bug
>          Components: impl
>            Reporter: Alan Gates
>            Assignee: Alan Gates
> I propose the following changes to pig optimizer, plan, and operator 
> functionality to support more robust optimization:
> 1) Remove the required array from Rule.  This will change rules so that they 
> only match exact patterns instead of allowing missing elements in the pattern.
> This has the downside that if a given rule applies to two patterns (say 
> Load->Filter->Group, Load->Group) you have to write two rules.  But it has 
> the upside that
> the resulting rules know exactly what they are getting.  The original intent 
> of this was to reduce the number of rules that needed to be written.  But the
> resulting rules have do a lot of work to understand the operators they are 
> working with.  With exact matches only, each rule will know exactly the 
> operators it
> is working on and can apply the logic of shifting the operators around.  All 
> four of the existing rules set all entries of required to true, so removing 
> this
> will have no effect on them.
> 2) Change PlanOptimizer.optimize to iterate over the rules until there are no 
> conversions or a certain number of iterations has been reached.  Currently the
> function is:
> {code}
>     public final void optimize() throws OptimizerException {
>         RuleMatcher matcher = new RuleMatcher();
>         for (Rule rule : mRules) {
>             if (matcher.match(rule)) {
>                 // It matches the pattern.  Now check if the transformer
>                 // approves as well.
>                 List<List<O>> matches = matcher.getAllMatches();
>                 for (List<O> match:matches)
>                 {
>                       if (rule.transformer.check(match)) {
>                           // The transformer approves.
>                           rule.transformer.transform(match);
>                       }
>                 }
>             }
>         }
>     }
> {code}
> It would change to be:
> {code}
>     public final void optimize() throws OptimizerException {
>         RuleMatcher matcher = new RuleMatcher();
>         boolean sawMatch;
>         int iterators = 0;
>         do {
>             sawMatch = false;
>             for (Rule rule : mRules) {
>                 List<List<O>> matches = matcher.getAllMatches();
>                 for (List<O> match:matches) {
>                     // It matches the pattern.  Now check if the transformer
>                     // approves as well.
>                     if (rule.transformer.check(match)) {
>                         // The transformer approves.
>                         sawMatch = true;
>                         rule.transformer.transform(match);
>                     }
>                 }
>             }
>             // Not sure if 1000 is the right number of iterations, maybe it
>             // should be configurable so that large scripts don't stop too 
>             // early.
>         } while (sawMatch && numIterations++ < 1000);
>     }
> {code}
> The reason for limiting the number of iterations is to avoid infinite loops.  
> The reason for iterating over the rules is so that each rule can be applied 
> multiple
> times as necessary.  This allows us to write simple rules, mostly swaps 
> between neighboring operators, without worrying that we get the plan right in 
> one pass.
> For example, we might have a plan that looks like:  
> Load->Join->Filter->Foreach, and we want to optimize it to 
> Load->Foreach->Filter->Join.  With two simple
> rules (swap filter and join and swap foreach and filter), applied 
> iteratively, we can get from the initial to final plan, without needing to 
> understanding the
> big picture of the entire plan.
> 3) Add three calls to OperatorPlan:
> {code}
> /**
>  * Swap two operators in a plan.  Both of the operators must have single
>  * inputs and single outputs.
>  * @param first operator
>  * @param second operator
>  * @throws PlanException if either operator is not single input and output.
>  */
> public void swap(E first, E second) throws PlanException {
>     ...
> }
> /**
>  * Push one operator in front of another.  This function is for use when
>  * the first operator has multiple inputs.  The caller can specify
>  * which input of the first operator the second operator should be pushed to.
>  * @param first operator, assumed to have multiple inputs.
>  * @param second operator, will be pushed in front of first
>  * @param inputNum, indicates which input of the first operator the second
>  * operator will be pushed onto.  Numbered from 0.
>  * @throws PlanException if inputNum does not exist for first operator
>  */
> public void pushBefore(E first, E second, int inputNum) throws PlanException {
>     ...
> }
> /**
>  * Push one operator after another.  This function is for use when the second
>  * operator has multiple outputs.  The caller can specify which output of the
>  * second operator the first operator should be pushed to.
>  * @param first operator, will be pushed after the second operator
>  * @param second operator, assumed to have multiple outputs
>  * @param outputNum indicates which output of the second operator the first 
>  * operator will be pushed onto.  Numbered from 0.
>  * @throws PlanException if outputNum does not exist for second operator
>  */
> public void pushAfter(E first, E second, int outputNum) throws PlanException {
>     ...
> }
> {code}
> The rules in the optimizer can use these three functions, along with the 
> existing insertBetween(), replace(), and removeAndReconnect() calls to 
> operate on the
> plan.
> 4) Add a new call to Operator:
> {code}
> /**
>  * Make any necessary changes to a node based on a change of position in the
>  * plan.  This allows operators to rewire their projections, etc. when they
>  * are relocated in a plan.
>  * @param oldPred Operator that was previously the predecessor.
>  * @param newPred Operator thwas will now be the predecessor.
>  * @throws PlanException
>  */
> public abstract void rewire(Operator oldPred, Operator newPred) throws 
> PlanException;
> {code}
> This method will be called by the swap, pushBefore, pushAfter, insertBetween, 
> replace, and removeAndReconnect in OperatorPlan whenever an operator is moved
> around so that the operator has a chance to make any necessary changes.  
> 5) Add new calls to LogicalOperator and PhysicalOperator
> {code}
> /**
>  * A struct detailing how a projection is altered by an operator.
>  */
> public class ProjectionMap {
>     /**
>      * Quick way for an operator to note that its input and output are the
>      * same.
>      */
>     public boolean noChange;
>     /**
>      * Map of field changes, with keys being the output fields of the 
>      * operator and values being the input fields.  Fields are numbered from
>      * 0.  So for a foreach operator derived from
>      * 'B = foreach A generate $0, $2, $3, udf($1)' 
>      * would produce a mapping of 0->0, 1->2, 2->3
>      */
>     public Map<Integer, Integer> mappedFields;
>     /**
>      * List of fields removed from the input.  This includes fields that were
>      * transformed, and thus are no longer the same fields.  Using the
>      * example foreach given under mappedFields, this list would contain '1'.
>      */
>     public List<Integer> removedFields;
>     /**
>      * List of fields in the output of this operator that were created by this
>      * operator.  Using the example foreach given under mappedFields, this 
> list
>      * would contain '3'.
>      */
>     public List<Integer> addedFields;
> }
> /**
>  * Produce a map describing how this operator modifies its projection.
>  * @returns ProjectionMap null indicates it does not know how the projection
>  * changes, for example a join of two inputs where one input does not have
>  * a schema.
>  */
> public abstract ProjectionMap getProjectionMap();
> /**
>  * Get a list of fields that this operator requires.  This is not necessarily
>  * equivalent to the list of fields the operator projects.  For example,
>  * a filter will project anything passed to it, but requires only the fields
>  * explicitly referenced in its filter expression.
>  * @return list of fields, numbered from 0.
>  */
> public abstract List<Integer> getRequiredFields();
> {code}
> These calls will be called by optimizer rules to determine whether or not a 
> swap can be done (for example, you can't swap two operators if the second one 
> uses a
> field added by the first), and once the swap is done they will be used by 
> rewire to understand how to map projections in the operators.
> 6)  It's not clear that the RuleMatcher, in its current form, will work with 
> rules that are not linear.  That is, it matches rules that look like:
> Operators {Foreach, Filter}
> Edges {0->1}
> But I don't know if it will match rules that look like:
> Operators {Scan, Scan, Join}
> Edges {0->2, 1->2}
> For the optimizer to be able to determine join types and operations with 
> splits, it will have to be able to do that.
> Examples of types of rules that is optimizer could support:
> 1) Pushing filters in front of joins.
> 2) Pushing foreachs with flattens (which thus greathly expand the data) down 
> the tree past filters, joins, etc.
> 3) Pushing type casting used for schemas in loads down to the point where the 
> field is actually used.
> 4) Deciding when to do fragment/replicate join or sort/merge join instead of 
> the standard hash join.
> 5) The current optimizations:  pushing limit up the tree, making implicit 
> splits explicit, merge load and stream where possible, using the combiner.
> 6) Merge filters or foreachs where possible
> In particular the combiner optimizer hopefully can be completely rewritten to 
> use the optimizer framework to make decisions about how to rework physical 
> plans
> to push work into the combiner.

This message is automatically generated by JIRA.
You can reply to this email to add a comment to the issue online.

Reply via email to