Author: gates
Date: Fri Sep  5 16:32:57 2008
New Revision: 692579

URL: http://svn.apache.org/viewvc?rev=692579&view=rev
Log:
PIG-401 Fixed issues with implicit splits being inserted.


Modified:
    incubator/pig/branches/types/src/org/apache/pig/PigServer.java
    
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalOptimizer.java
    
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalTransformer.java
    
incubator/pig/branches/types/src/org/apache/pig/impl/plan/SplitIntroducer.java
    
incubator/pig/branches/types/src/org/apache/pig/impl/plan/optimizer/RuleMatcher.java
    
incubator/pig/branches/types/test/org/apache/pig/test/TestLogicalOptimizer.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestOperatorPlan.java

Modified: incubator/pig/branches/types/src/org/apache/pig/PigServer.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/PigServer.java?rev=692579&r1=692578&r2=692579&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/PigServer.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/PigServer.java Fri Sep  5 
16:32:57 2008
@@ -38,17 +38,13 @@
 import org.apache.pig.backend.datastorage.ElementDescriptor;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.executionengine.ExecJob;
-import org.apache.pig.backend.executionengine.ExecPhysicalPlan;
 import org.apache.pig.backend.executionengine.ExecJob.JOB_STATUS;
-import org.apache.pig.backend.executionengine.ExecutionEngine;
 import org.apache.pig.builtin.BinStorage;
 import org.apache.pig.builtin.PigStorage;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileLocalizer;
-import org.apache.pig.impl.logicalLayer.ExpressionOperator;
 import org.apache.pig.impl.logicalLayer.FrontendException;
-import org.apache.pig.impl.logicalLayer.LOVisitor;
 import org.apache.pig.impl.logicalLayer.LogicalOperator;
 import org.apache.pig.impl.logicalLayer.LogicalPlan;
 import org.apache.pig.impl.logicalLayer.LogicalPlanBuilder;
@@ -61,10 +57,7 @@
 import 
org.apache.pig.impl.logicalLayer.validators.LogicalPlanValidationExecutor;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.impl.plan.CompilationMessageCollector;
-import org.apache.pig.impl.plan.DependencyOrderWalker;
 import org.apache.pig.impl.plan.OperatorKey;
-import org.apache.pig.impl.plan.SplitIntroducer;
-import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.streaming.StreamingCommand;
 import org.apache.pig.impl.util.WrappedIOException;
 import org.apache.pig.impl.util.PropertiesUtil;
@@ -569,7 +562,7 @@
         PlanSetter ps = new PlanSetter(lp);
         ps.visit();
         
-        (new SplitIntroducer(lp)).introduceImplSplits();
+        //(new SplitIntroducer(lp)).introduceImplSplits();
         
         // run through validator
         CompilationMessageCollector collector = new 
CompilationMessageCollector() ;

Modified: 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalOptimizer.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalOptimizer.java?rev=692579&r1=692578&r2=692579&view=diff
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalOptimizer.java
 (original)
+++ 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalOptimizer.java
 Fri Sep  5 16:32:57 2008
@@ -36,23 +36,40 @@
 
         // List of rules for the logical optimizer
         
-        // Add type casting to plans where the schema has been declared (by
-        // user, data, or data catalog).
+        // This one has to be first, as the type cast inserter expects the
+        // load to only have one output.
+        // Find any places in the plan that have an implicit split and make
+        // it explicit.  Since the RuleMatcher doesn't handle trees properly,
+        // we cheat and say that we match any node.  Then we'll do the actual
+        // test in the transformers check method.
         List<String> nodes = new ArrayList<String>(1);
-        nodes.add("org.apache.pig.impl.logicalLayer.LOLoad");
         Map<Integer, Integer> edges = new HashMap<Integer, Integer>();
         List<Boolean> required = new ArrayList<Boolean>(1);
+        nodes.add("any");
         required.add(true);
-        mRules.add(new Rule(nodes, edges, required,
+        mRules.add(new Rule<LogicalOperator, LogicalPlan>(nodes, edges,
+            required, new ImplicitSplitInserter(plan)));
+        
+        // Add type casting to plans where the schema has been declared (by
+        // user, data, or data catalog).
+        nodes = new ArrayList<String>(1);
+        nodes.add("org.apache.pig.impl.logicalLayer.LOLoad");
+        edges = new HashMap<Integer, Integer>();
+        required = new ArrayList<Boolean>(1);
+        required.add(true);
+        mRules.add(new Rule<LogicalOperator, LogicalPlan>(nodes, edges, 
required,
             new TypeCastInserter(plan)));
         
+        // Push up limit where ever possible.
         nodes = new ArrayList<String>(1);
         edges = new HashMap<Integer, Integer>();
         required = new ArrayList<Boolean>(1);
         nodes.add("org.apache.pig.impl.logicalLayer.LOLimit");
         required.add(true);
-        mRules.add(new Rule(nodes, edges, required,
-                new OpLimitOptimizer(plan)));
+        mRules.add(new Rule<LogicalOperator, LogicalPlan>(nodes, edges, 
required,
+            new OpLimitOptimizer(plan)));
+        
+        
     }
 
 }

Modified: 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalTransformer.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalTransformer.java?rev=692579&r1=692578&r2=692579&view=diff
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalTransformer.java
 (original)
+++ 
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalTransformer.java
 Fri Sep  5 16:32:57 2008
@@ -147,7 +147,28 @@
         // Insert it into the plan.
         mPlan.add(newNode);
         mPlan.insertBetween(after, newNode, before);
-
+        fixUpContainedPlans(after, newNode, before, projectionMapping);
+    }
+    
+    /**
+     * Once a node has been inserted, inner plans associated with other nodes
+     * may have references to the node that has been replaced or moved.  This
+     * function walks those inner plans and patches up references.
+     * @param after Node that has had a new node inserted after it.
+     * @param newNode node that has been inserted
+     * @param before Node that has had a new node inserted before it.
+     * @param projectionMapping A map that defines how projections in after
+     * relate to projections in newNode.  Keys are the projection offsets in
+     * after, values are the new offsets in newNode.  If this field is null,
+     * then it will be assumed that the mapping is 1-1.
+     * @throws VisitorException, FrontendException
+     */
+    protected void fixUpContainedPlans(
+            LogicalOperator after,
+            LogicalOperator newNode,
+            LogicalOperator before,
+            Map<Integer, Integer> projectionMapping) 
+    throws VisitorException, FrontendException {
         // Fix up COGroup internal wiring
         if (before instanceof LOCogroup) {
             LOCogroup cg = (LOCogroup) before ;
@@ -175,9 +196,6 @@
                 new ProjectFixerUpper(lp, newNode, projectionMapping);
             pfu.visit();
         }
-
-        // Now rebuild the schemas
-        // rebuildSchemas();
     }
 
     /**

Modified: 
incubator/pig/branches/types/src/org/apache/pig/impl/plan/SplitIntroducer.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/plan/SplitIntroducer.java?rev=692579&r1=692578&r2=692579&view=diff
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/impl/plan/SplitIntroducer.java 
(original)
+++ 
incubator/pig/branches/types/src/org/apache/pig/impl/plan/SplitIntroducer.java 
Fri Sep  5 16:32:57 2008
@@ -1,107 +0,0 @@
-package org.apache.pig.impl.plan;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.pig.ExecType;
-import org.apache.pig.PigServer;
-import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.data.DataType;
-import org.apache.pig.impl.logicalLayer.FrontendException;
-import org.apache.pig.impl.logicalLayer.LOConst;
-import org.apache.pig.impl.logicalLayer.LOSplit;
-import org.apache.pig.impl.logicalLayer.LOSplitOutput;
-import org.apache.pig.impl.logicalLayer.LogicalOperator;
-import org.apache.pig.impl.logicalLayer.LogicalPlan;
-
-public class SplitIntroducer extends PlanWalker<LogicalOperator, LogicalPlan> {
-    private NodeIdGenerator nodeIdGen;
-    
-    public SplitIntroducer(LogicalPlan plan) {
-        super(plan);
-        nodeIdGen = NodeIdGenerator.getGenerator();
-    }
-    
-    private long getNextId(String scope) {
-        return nodeIdGen.getNextNodeId(scope);
-    }
-
-    @Override
-    public PlanWalker<LogicalOperator, LogicalPlan> 
spawnChildWalker(LogicalPlan plan) {
-        return new SplitIntroducer(plan);
-    }
-    
-    public void introduceImplSplits() throws VisitorException {
-        List<LogicalOperator> roots = copySucs(mPlan.getRoots());
-        if(roots == null) return;
-        for (LogicalOperator root : roots) {
-            processNode(root);
-        }
-    }
-
-    @Override
-    /**
-     * This method is to conform to the interface.
-     */
-    public void walk(PlanVisitor visitor) throws VisitorException {
-        throw new VisitorException(
-                "This method is not to be used. This Walker does not call any 
visit() methods. It only alters the plan by introducing implicit splits if 
necessary.");
-    }
-    
-    private void processNode(LogicalOperator root) throws VisitorException {
-        if(root instanceof LOSplit || root instanceof LOSplitOutput) return;
-        List<LogicalOperator> sucs = mPlan.getSuccessors(root);
-        if(sucs==null) return;
-        int size = sucs.size();
-        if(size==0 || size==1) return;
-        sucs = copySucs(mPlan.getSuccessors(root));
-        disconnect(root,sucs);
-        String scope = root.getOperatorKey().scope;
-        LOSplit splitOp = new LOSplit(mPlan, new OperatorKey(scope, 
getNextId(scope)), new ArrayList<LogicalOperator>());
-        mPlan.add(splitOp);
-        try {
-            mPlan.connect(root, splitOp);
-            int index = -1;
-            for (LogicalOperator operator : sucs) {
-                LogicalPlan condPlan = new LogicalPlan();
-                LOConst cnst = new LOConst(mPlan,new OperatorKey(scope, 
getNextId(scope)), true);
-                cnst.setType(DataType.BOOLEAN);
-                condPlan.add(cnst);
-                LOSplitOutput splitOutput = new LOSplitOutput(mPlan, new 
OperatorKey(scope, getNextId(scope)), ++index, condPlan);
-                splitOp.addOutput(splitOutput);
-                mPlan.add(splitOutput);
-                mPlan.connect(splitOp, splitOutput);
-                mPlan.connect(splitOutput, operator);
-            }
-        } catch (PlanException e) {
-            throw new VisitorException(e);
-        }
-    }
-    private List<LogicalOperator> copySucs(List<LogicalOperator> successors){
-        ArrayList<LogicalOperator> ret = new ArrayList<LogicalOperator>();
-        for (LogicalOperator operator : successors) {
-            ret.add(operator);
-        }
-        return ret;
-    }
-    
-    private void disconnect(LogicalOperator from, List<LogicalOperator> 
successors) {
-        for (LogicalOperator operator : successors) {
-            mPlan.disconnect(from, operator);
-        }
-    }
-    
-   /* public static void main(String[] args) throws ExecException, 
IOException, FrontendException {
-        PigServer ser = new PigServer(ExecType.LOCAL);
-        ser.registerQuery("A = load 'file:/etc/passwd' using 
PigStorage(':');");
-        ser.registerQuery("B = foreach A generate $0;");
-        ser.registerQuery("C = foreach A generate $1;");
-        ser.registerQuery("D = group B by $0, C by $0;");
-        LogicalPlan lp = ser.getPlanFromAlias("D", "Testing");
-        lp.explain(System.out, System.err);
-        LogicalPlan fixedLp = ser.compileLp(lp, "Testing");
-        System.out.println();
-        fixedLp.explain(System.out, System.err);
-    }*/
-}

Modified: 
incubator/pig/branches/types/src/org/apache/pig/impl/plan/optimizer/RuleMatcher.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/plan/optimizer/RuleMatcher.java?rev=692579&r1=692578&r2=692579&view=diff
==============================================================================
--- 
incubator/pig/branches/types/src/org/apache/pig/impl/plan/optimizer/RuleMatcher.java
 (original)
+++ 
incubator/pig/branches/types/src/org/apache/pig/impl/plan/optimizer/RuleMatcher.java
 Fri Sep  5 16:32:57 2008
@@ -161,7 +161,8 @@
         for (int i = 0; i < sz; i++) mMatch.add(null);
         
         List<O> successors = new ArrayList<O>();
-        if (node.getClass().getName().equals(mRule.nodes.get(0))) {
+        if (node.getClass().getName().equals(mRule.nodes.get(0)) || 
+                mRule.nodes.get(0).equals("any")) {
             mMatch.set(0, node);
             // Follow the edge to see the next node we should be looking for.
             Integer nextOpNum = mRule.edges.get(0);
@@ -180,7 +181,8 @@
     }
 
     private boolean continueMatch(O current, Integer nodeNumber) {
-        if (current.getClass().getName() == mRule.nodes.get(nodeNumber)) {
+        if (current.getClass().getName().equals(mRule.nodes.get(nodeNumber)) 
|| 
+                mRule.nodes.get(nodeNumber).equals("any")) {
             mMatch.set(nodeNumber, current);
 
             // Follow the edge to see the next node we should be looking for.

Modified: 
incubator/pig/branches/types/test/org/apache/pig/test/TestLogicalOptimizer.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestLogicalOptimizer.java?rev=692579&r1=692578&r2=692579&view=diff
==============================================================================
--- 
incubator/pig/branches/types/test/org/apache/pig/test/TestLogicalOptimizer.java 
(original)
+++ 
incubator/pig/branches/types/test/org/apache/pig/test/TestLogicalOptimizer.java 
Fri Sep  5 16:32:57 2008
@@ -17,16 +17,11 @@
  */
 package org.apache.pig.test;
 
-import java.io.BufferedReader;
 import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.FileReader;
-import java.io.IOException;
 
 import org.apache.pig.impl.logicalLayer.*;
 import org.apache.pig.impl.logicalLayer.optimizer.*;
 import org.apache.pig.test.utils.LogicalPlanTester;
-import org.apache.pig.test.utils.TypeCheckingTestUtil;
 
 import org.junit.Test;
 import org.junit.Before;

Modified: 
incubator/pig/branches/types/test/org/apache/pig/test/TestOperatorPlan.java
URL: 
http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestOperatorPlan.java?rev=692579&r1=692578&r2=692579&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestOperatorPlan.java 
(original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestOperatorPlan.java 
Fri Sep  5 16:32:57 2008
@@ -660,6 +660,39 @@
         assertTrue(transformer.mTransformed);
     }
 
+    // Test that we match when the pattern says any.  Will give
+    // a pattern of any and a plan of S->S->M.
+    @Test
+    public void testOptimizerMatchesAny() throws Exception {
+        // Build a plan
+        TPlan plan = new TPlan();
+        TOperator[] ops = new TOperator[3];
+        ops[0] = new SingleOperator("1");
+        plan.add(ops[0]);
+        ops[1] = new SingleOperator("2");
+        plan.add(ops[1]);
+        ops[2] = new MultiOperator("3");
+        plan.add(ops[2]);
+        plan.connect(ops[0], ops[1]);
+        plan.connect(ops[1], ops[2]);
+
+        // Create our rule
+        ArrayList<String> nodes = new ArrayList<String>(3);
+        nodes.add("any");
+        HashMap<Integer, Integer> edges = new HashMap<Integer, Integer>(2);
+        ArrayList<Boolean> required = new ArrayList<Boolean>(1);
+        required.add(true);
+        AlwaysTransform transformer = new AlwaysTransform(plan);
+        Rule<TOperator, TPlan> r =
+            new Rule<TOperator, TPlan>(nodes, edges, required, transformer);
+
+        TOptimizer optimizer = new TOptimizer(plan);
+        optimizer.addRule(r);
+
+        optimizer.optimize();
+        assertTrue(transformer.mTransformed);
+    }
+
     // Test that we match when the whole plan doesn't match.  Will give
     // a pattern of S->S->M and a plan of S->S->S->M.
     @Test


Reply via email to