[ 
https://issues.apache.org/jira/browse/PIG-697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=12697146#action_12697146
 ] 

David Ciemiewicz commented on PIG-697:
--------------------------------------

Some thoughts on optimization problems and patterns from SQL and coding Pig and 
my desire for a higher level version of Pig than we have today.

I know this may come off as "distraction" but hopefully you'll have some time 
to hear me out.

* after a conversation with Santhosh about the SQL to Pig translation work 
* multiple issues I have countered with nested foreach statements including 
redundant function execution 
* nested FOREACH statement "assignment" computation bugs 
* hand coding chains of foreach statements so I can get the Algebraic combiner 
to kick 
* hand coding chains of foreach statements and grouping statements rather than 
using a single statement

I think I might have stumbled on a potentially improved model for Pig to Pig 
execution plan generation:

{code}
            High Level Pig to Low Level Pig translation
{code}

I think this would potentially benefit the SQL to Pig efforts and provide for 
programmer coding efficiency in Pig as well.

This will be a bit protracted, but I hope you have some time to consider it.

Take the following SQL idiom that the SQL to Pig translator will need to 
support:

{code}
            select
                        EXP(AVG(LN(time+0.1))) as geomean_time
            from
                        events
            where
                        time is not null and
                        time >= 0;
{code}

In "high level pig", I have wanted to code this as"
 
{code}
            A = load 'events' using PigStorage() as ( time: int );
            B = filter A by time is not null and time >= 0;
            C = group B all;
            D = foreach C generate EXP(AVG(LN(B.time+0.1))) as geomean_time;
{code}

In fact, this would seem to provide a nice translation path from SQL to "low 
level pig" via "high level pig".

Unfortunately, this won't work.  We developers must write Pig scripts at a 
lower level and break all of this apart into various steps.

An additional issue is that, because of some, um, workarounds, in the execution 
plan optimizations, the combiner won't kick in if we don't do further steps.

So the most "performant" version of the desired pig script is the following 
really "low level pig" where D is broken into 3 steps, merging one with B and 
the remaining 2 steps as separate D steps:

 
{code}
            A = load 'events' using PigStorage() as ( time: int );
            B = filter A by time is not null and time >= 0;
            B = foreach A generate LOG(time+0.1) as log_time;
            C = group B all;
            D = foreach C generate group, AVG(B.log_time) as mean_log_time;
                                    -- note that group alias is required for 
Algebraic combiner to kick in
            D = foreach D generate EXP(mean_log_time) as geomean_time;
{code}

If we can figure out how to translate SQL into this last "low-level" set of 
statements, why couldn't we or shouldn't we have "high level pig" as well and 
permit more efficient code writing and optimization?


Next example

I do a bunch of nested intermediate computations in a nested FOREACH statement:

{code}
C = foreach C {
        curr_mean_log_timetonextevent = curr_sum_log_timetonextevent / 
(double)count;
        curr_meansq_log_timetonextevent = curr_sumsq_log_timetonextevent / 
(double)count;
        curr_var_log_timetonextevent = curr_meansq_log_timetonextevent - 
                        (curr_mean_log_timetonextevent * 
curr_mean_log_timetonextevent);
        curr_sterr_log_timetonextevent = math.SQRT(curr_var_log_timetonextevent 
/ (double)count);
 

        curr_geomean_timetonextevent = math.EXP(curr_mean_log_timetonextevent);
        curr_geosterr_timetonextevent = 
math.EXP(curr_sterr_log_timetonextevent);
        curr_mean_timetonextevent = curr_sum_log_timetonextevent / 
(double)count;
        curr_meansq_timetonextevent = curr_sumsq_log_timetonextevent / 
(double)count;
        curr_var_timetonextevent = curr_meansq_timetonextevent - 
                        (curr_mean_timetonextevent * curr_mean_timetonextevent);

        curr_sterr_timetonextevent = math.SQRT(curr_var_timetonextevent / 
count);

        generate
            ...
{code}

The code for nested statements in Pig has been particularly problematic and 
buggy including problems such as:

* redundant execution of functions such as SUM, AVG
* nested function problems
* mathematical operator problems (illustrated in this bug)
* no type propagation
* the need to use AS clauses to name nested alias assignments projected in the 
GENERATE clauses

What if instead of trying to do all of these operations in some specialized 
execution code, what if this was treated as "high level" pig that translated 
all of these intermediate statements into two or more "low level" foreach 
expansions.

This isn't as wild as it seems because 9 times out of 10, the "workaround" that 
I have had to do is exactly that: I had to stop using nested foreach and 
instead break the code into two separate foreach statements chained together.

In other words I went from the above nested foreach statement that generated 
errors and didn't work to two hand coded foreach statements (or more) that did:

{code}
C = foreach C generate
            *,
            curr_sum_log_timetonextevent / (double)count as 
curr_mean_log_timetonextevent,
            curr_sumsq_log_timetonextevent / (double)count as 
curr_meansq_log_timetonextevent;

C = foreach C generate=
            *,
            curr_meansq_log_timetonextevent - 
                        (curr_mean_log_timetonextevent * 
curr_mean_log_timetonextevent)
                                                                                
    as curr_var_log_timetonextevent;

C = foreach C generate
            *,
            math.SQRT(curr_var_log_timetonextevent / (double)count) as 
curr_sterr_log_timetonextevent;
{code}

This was the only way I could avoid the redundant computations and get the code 
actually work. Well, actually if I added casts at appropriate places, it also 
worked, but what a pain.

This would also have the advantage that alias names used in the nested 
"assignments" would actually propagate without an "as" clause in the subsequent 
generate statement.

I know this is a "brain fart" but it does have a time honored tradition in 
languages like C, C++, Lisp of using the language to "bootstrap" the language 
by translating from more "high level" idioms to less feature rich "low level" 
idioms.

It just seemed like a plausible way of speeding up both development of a SQL to 
Pig translator as well as allowing a more rapid transition of Pig to higher 
level idioms while correcting whole swaths of execution bugs and performance 
optimization issues as well.




> Proposed improvements to pig's optimizer
> ----------------------------------------
>
>                 Key: PIG-697
>                 URL: https://issues.apache.org/jira/browse/PIG-697
>             Project: Pig
>          Issue Type: Bug
>          Components: impl
>            Reporter: Alan Gates
>            Assignee: Alan Gates
>
> I propose the following changes to pig optimizer, plan, and operator 
> functionality to support more robust optimization:
> 1) Remove the required array from Rule.  This will change rules so that they 
> only match exact patterns instead of allowing missing elements in the pattern.
> This has the downside that if a given rule applies to two patterns (say 
> Load->Filter->Group, Load->Group) you have to write two rules.  But it has 
> the upside that
> the resulting rules know exactly what they are getting.  The original intent 
> of this was to reduce the number of rules that needed to be written.  But the
> resulting rules have do a lot of work to understand the operators they are 
> working with.  With exact matches only, each rule will know exactly the 
> operators it
> is working on and can apply the logic of shifting the operators around.  All 
> four of the existing rules set all entries of required to true, so removing 
> this
> will have no effect on them.
> 2) Change PlanOptimizer.optimize to iterate over the rules until there are no 
> conversions or a certain number of iterations has been reached.  Currently the
> function is:
> {code}
>     public final void optimize() throws OptimizerException {
>         RuleMatcher matcher = new RuleMatcher();
>         for (Rule rule : mRules) {
>             if (matcher.match(rule)) {
>                 // It matches the pattern.  Now check if the transformer
>                 // approves as well.
>                 List<List<O>> matches = matcher.getAllMatches();
>                 for (List<O> match:matches)
>                 {
>                       if (rule.transformer.check(match)) {
>                           // The transformer approves.
>                           rule.transformer.transform(match);
>                       }
>                 }
>             }
>         }
>     }
> {code}
> It would change to be:
> {code}
>     public final void optimize() throws OptimizerException {
>         RuleMatcher matcher = new RuleMatcher();
>         boolean sawMatch;
>         int iterators = 0;
>         do {
>             sawMatch = false;
>             for (Rule rule : mRules) {
>                 List<List<O>> matches = matcher.getAllMatches();
>                 for (List<O> match:matches) {
>                     // It matches the pattern.  Now check if the transformer
>                     // approves as well.
>                     if (rule.transformer.check(match)) {
>                         // The transformer approves.
>                         sawMatch = true;
>                         rule.transformer.transform(match);
>                     }
>                 }
>             }
>             // Not sure if 1000 is the right number of iterations, maybe it
>             // should be configurable so that large scripts don't stop too 
>             // early.
>         } while (sawMatch && numIterations++ < 1000);
>     }
> {code}
> The reason for limiting the number of iterations is to avoid infinite loops.  
> The reason for iterating over the rules is so that each rule can be applied 
> multiple
> times as necessary.  This allows us to write simple rules, mostly swaps 
> between neighboring operators, without worrying that we get the plan right in 
> one pass.
> For example, we might have a plan that looks like:  
> Load->Join->Filter->Foreach, and we want to optimize it to 
> Load->Foreach->Filter->Join.  With two simple
> rules (swap filter and join and swap foreach and filter), applied 
> iteratively, we can get from the initial to final plan, without needing to 
> understanding the
> big picture of the entire plan.
> 3) Add three calls to OperatorPlan:
> {code}
> /**
>  * Swap two operators in a plan.  Both of the operators must have single
>  * inputs and single outputs.
>  * @param first operator
>  * @param second operator
>  * @throws PlanException if either operator is not single input and output.
>  */
> public void swap(E first, E second) throws PlanException {
>     ...
> }
> /**
>  * Push one operator in front of another.  This function is for use when
>  * the first operator has multiple inputs.  The caller can specify
>  * which input of the first operator the second operator should be pushed to.
>  * @param first operator, assumed to have multiple inputs.
>  * @param second operator, will be pushed in front of first
>  * @param inputNum, indicates which input of the first operator the second
>  * operator will be pushed onto.  Numbered from 0.
>  * @throws PlanException if inputNum does not exist for first operator
>  */
> public void pushBefore(E first, E second, int inputNum) throws PlanException {
>     ...
> }
> /**
>  * Push one operator after another.  This function is for use when the second
>  * operator has multiple outputs.  The caller can specify which output of the
>  * second operator the first operator should be pushed to.
>  * @param first operator, will be pushed after the second operator
>  * @param second operator, assumed to have multiple outputs
>  * @param outputNum indicates which output of the second operator the first 
>  * operator will be pushed onto.  Numbered from 0.
>  * @throws PlanException if outputNum does not exist for second operator
>  */
> public void pushAfter(E first, E second, int outputNum) throws PlanException {
>     ...
> }
> {code}
> The rules in the optimizer can use these three functions, along with the 
> existing insertBetween(), replace(), and removeAndReconnect() calls to 
> operate on the
> plan.
> 4) Add a new call to Operator:
> {code}
> /**
>  * Make any necessary changes to a node based on a change of position in the
>  * plan.  This allows operators to rewire their projections, etc. when they
>  * are relocated in a plan.
>  * @param oldPred Operator that was previously the predecessor.
>  * @param newPred Operator thwas will now be the predecessor.
>  * @throws PlanException
>  */
> public abstract void rewire(Operator oldPred, Operator newPred) throws 
> PlanException;
> {code}
> This method will be called by the swap, pushBefore, pushAfter, insertBetween, 
> replace, and removeAndReconnect in OperatorPlan whenever an operator is moved
> around so that the operator has a chance to make any necessary changes.  
> 5) Add new calls to LogicalOperator and PhysicalOperator
> {code}
> /**
>  * A struct detailing how a projection is altered by an operator.
>  */
> public class ProjectionMap {
>     /**
>      * Quick way for an operator to note that its input and output are the
>      * same.
>      */
>     public boolean noChange;
>     /**
>      * Map of field changes, with keys being the output fields of the 
>      * operator and values being the input fields.  Fields are numbered from
>      * 0.  So for a foreach operator derived from
>      * 'B = foreach A generate $0, $2, $3, udf($1)' 
>      * would produce a mapping of 0->0, 1->2, 2->3
>      */
>     public Map<Integer, Integer> mappedFields;
>     /**
>      * List of fields removed from the input.  This includes fields that were
>      * transformed, and thus are no longer the same fields.  Using the
>      * example foreach given under mappedFields, this list would contain '1'.
>      */
>     public List<Integer> removedFields;
>     /**
>      * List of fields in the output of this operator that were created by this
>      * operator.  Using the example foreach given under mappedFields, this 
> list
>      * would contain '3'.
>      */
>     public List<Integer> addedFields;
> }
> /**
>  * Produce a map describing how this operator modifies its projection.
>  * @returns ProjectionMap null indicates it does not know how the projection
>  * changes, for example a join of two inputs where one input does not have
>  * a schema.
>  */
> public abstract ProjectionMap getProjectionMap();
> /**
>  * Get a list of fields that this operator requires.  This is not necessarily
>  * equivalent to the list of fields the operator projects.  For example,
>  * a filter will project anything passed to it, but requires only the fields
>  * explicitly referenced in its filter expression.
>  * @return list of fields, numbered from 0.
>  */
> public abstract List<Integer> getRequiredFields();
> {code}
> These calls will be called by optimizer rules to determine whether or not a 
> swap can be done (for example, you can't swap two operators if the second one 
> uses a
> field added by the first), and once the swap is done they will be used by 
> rewire to understand how to map projections in the operators.
> 6)  It's not clear that the RuleMatcher, in its current form, will work with 
> rules that are not linear.  That is, it matches rules that look like:
> Operators {Foreach, Filter}
> Edges {0->1}
> But I don't know if it will match rules that look like:
> Operators {Scan, Scan, Join}
> Edges {0->2, 1->2}
> For the optimizer to be able to determine join types and operations with 
> splits, it will have to be able to do that.
> Examples of types of rules that is optimizer could support:
> 1) Pushing filters in front of joins.
> 2) Pushing foreachs with flattens (which thus greathly expand the data) down 
> the tree past filters, joins, etc.
> 3) Pushing type casting used for schemas in loads down to the point where the 
> field is actually used.
> 4) Deciding when to do fragment/replicate join or sort/merge join instead of 
> the standard hash join.
> 5) The current optimizations:  pushing limit up the tree, making implicit 
> splits explicit, merge load and stream where possible, using the combiner.
> 6) Merge filters or foreachs where possible
> In particular the combiner optimizer hopefully can be completely rewritten to 
> use the optimizer framework to make decisions about how to rework physical 
> plans
> to push work into the combiner.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.

Reply via email to