Author: daijy
Date: Fri Aug  6 17:24:52 2010
New Revision: 983062

URL: http://svn.apache.org/viewvc?rev=983062&view=rev
Log:
PIG-1496: Mandatory rule ImplicitSplitInserter

Added:
    
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/ImplicitSplitInserter.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestNewPlanImplicitSplit.java

Added: 
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/ImplicitSplitInserter.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/ImplicitSplitInserter.java?rev=983062&view=auto
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/ImplicitSplitInserter.java
 (added)
+++ 
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/ImplicitSplitInserter.java
 Fri Aug  6 17:24:52 2010
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.newplan.logical.rules;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Iterator;
+
+import org.apache.pig.data.DataType;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.util.Pair;
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.logical.relational.LogicalPlan;
+import org.apache.pig.newplan.OperatorPlan;
+import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.newplan.logical.expression.ConstantExpression;
+import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
+import org.apache.pig.newplan.optimizer.Rule;
+import org.apache.pig.newplan.optimizer.Transformer;
+import org.apache.pig.newplan.logical.relational.LOSplit;
+import org.apache.pig.newplan.logical.relational.LOStore;
+import org.apache.pig.newplan.logical.relational.LOSplitOutput;
+import org.apache.pig.newplan.logical.relational.LogicalSchema;
+
+
+/**
+ * Super class for all rules that operates on the whole plan. It doesn't look 
for
+ * a specific pattern. An example of such kind rule is ColumnPrune.
+ *
+ */
+public class ImplicitSplitInserter extends Rule {
+
+    public ImplicitSplitInserter(String n) {
+        super(n, true);
+    }
+
+    @Override
+    public List<OperatorPlan> match(OperatorPlan plan) throws 
FrontendException {
+        // Look to see if this is a non-split node with two outputs.  If so
+        // it matches.
+        currentPlan = plan;
+        List<OperatorPlan> ll = new ArrayList<OperatorPlan>();
+        Iterator<Operator> ops = plan.getOperators();
+        while (ops.hasNext()) {
+            Operator op = ops.next();
+            if (op instanceof LOSplit || op instanceof LOStore)
+                continue;
+            List<Operator> succs = plan.getSuccessors(op);
+            if (succs != null && succs.size() >= 2) {
+                OperatorPlan match = new LogicalPlan();
+                match.add(op);
+                ll.add(match);
+            }
+        }
+        return ll;
+    }
+    
+    @Override
+    public Transformer getNewTransformer() {
+      return new ImplicitSplitInserterTransformer();
+    }
+    
+    public class ImplicitSplitInserterTransformer extends Transformer {
+      @Override
+      public boolean check(OperatorPlan matched) throws FrontendException {
+        return true;
+      }
+      
+      @Override 
+      public void transform(OperatorPlan matched) throws FrontendException {
+        if (matched == null || matched instanceof LOSplit || matched 
instanceof LOStore
+            || matched.size() != 1)
+          throw new FrontendException("Invalid match in ImplicitSplitInserter 
rule.", 2244);
+
+        // For two successors of op here is a pictorial
+        // representation of the change required:
+        // BEFORE:
+        // Succ1  Succ2
+        //  \       /
+        //      op
+        
+        //  SHOULD BECOME:
+        
+        // AFTER:
+        // Succ1          Succ2
+        //   |              |
+        // SplitOutput SplitOutput
+        //      \       /
+        //        Split
+        //          |
+        //          op
+        
+        Operator op = matched.getSources().get(0);
+        List<Operator> succs = currentPlan.getSuccessors(op);
+        if (succs == null || succs.size() < 2)
+          throw new FrontendException("Invalid match in ImplicitSplitInserter 
rule.", 2243);
+        LOSplit splitOp = new LOSplit(currentPlan);
+        splitOp.setAlias(((LogicalRelationalOperator) op).getAlias());
+        Operator[] sucs = succs.toArray(new Operator[0]);
+        currentPlan.add(splitOp);
+        currentPlan.connect(op, splitOp);
+        for (Operator suc : sucs) {
+          // position is remembered in order to maintain the order of the 
successors
+          Pair<Integer, Integer> pos = currentPlan.disconnect(op, suc);
+          LogicalExpressionPlan filterPlan = new LogicalExpressionPlan();
+          LogicalSchema.LogicalFieldSchema fs = new 
LogicalSchema.LogicalFieldSchema(null, null, DataType.BOOLEAN);
+          new ConstantExpression(filterPlan, Boolean.valueOf(true), fs);
+          LOSplitOutput splitOutput = new LOSplitOutput((LogicalPlan) 
currentPlan, filterPlan);
+          splitOutput.setAlias(splitOp.getAlias());
+          currentPlan.add(splitOutput);
+          currentPlan.connect(splitOp, splitOutput);
+          currentPlan.connect(splitOutput, pos.first, suc, pos.second);
+        }
+      }
+      
+      @Override
+      public OperatorPlan reportChanges() {
+        return currentPlan;
+      }
+    }
+    
+    @Override
+    protected OperatorPlan buildPattern() {
+        return null;
+    }
+}

Added: hadoop/pig/trunk/test/org/apache/pig/test/TestNewPlanImplicitSplit.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestNewPlanImplicitSplit.java?rev=983062&view=auto
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestNewPlanImplicitSplit.java 
(added)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestNewPlanImplicitSplit.java Fri 
Aug  6 17:24:52 2010
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+
+import static 
org.apache.pig.test.utils.TypeCheckingTestUtil.printMessageCollector;
+import static org.apache.pig.test.utils.TypeCheckingTestUtil.printTypeGraph;
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.PrintStream;
+import java.util.HashMap;
+import java.util.Iterator;
+
+import junit.framework.TestCase;
+
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.LogicalPlan;
+import org.apache.pig.impl.logicalLayer.PlanSetter;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.logicalLayer.validators.TypeCheckingValidator;
+import org.apache.pig.impl.plan.CompilationMessageCollector;
+import org.apache.pig.test.utils.LogicalPlanTester;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestNewPlanImplicitSplit {
+    private PigServer pigServer;
+    static MiniCluster cluster = MiniCluster.buildCluster();
+    
+    @Before
+    public void setUp() throws Exception {
+        pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+        
pigServer.getPigContext().getProperties().setProperty("pig.usenewlogicalplan", 
"true");
+    }
+
+    @After
+    public void tearDown() throws Exception {
+    }
+    
+    @AfterClass
+    public static void oneTimeTearDown() throws Exception {
+        cluster.shutDown();
+    }
+    
+    @Test
+    public void testImplicitSplit() throws Exception{
+        int LOOP_SIZE = 20;
+        String[] input = new String[LOOP_SIZE];
+        for(int i = 1; i <= LOOP_SIZE; i++) {
+            input[i-1] = Integer.toString(i);
+        }
+        String inputFileName = "testImplicitSplit-input.txt";
+        Util.createInputFile(cluster, inputFileName, input);
+        pigServer.registerQuery("A = LOAD '" + inputFileName + "';");
+        pigServer.registerQuery("B = filter A by $0<=10;");
+        pigServer.registerQuery("C = filter A by $0>10;");
+        pigServer.registerQuery("D = union B,C;");
+        Iterator<Tuple> iter = pigServer.openIterator("D");
+        if(!iter.hasNext()) fail("No Output received");
+        int cnt = 0;
+        while(iter.hasNext()){
+            Tuple t = iter.next();
+            ++cnt;
+        }
+        assertEquals(20, cnt);
+        Util.deleteFile(cluster, inputFileName);
+    }
+    
+    @Test
+    public void testImplicitSplitInCoGroup() throws Exception {
+        // this query is similar to the one reported in JIRA - PIG-537
+        // Create input file
+        String input1 = "testImplicitSplitInCoGroup-input1.txt";
+        String input2 = "testImplicitSplitInCoGroup-input2.txt";
+        Util.createInputFile(cluster, input1, 
+                new String[] {"a:1", "b:2", "b:20", "c:3", "c:30"});
+        Util.createInputFile(cluster, input2, 
+                new String[] {"a:first", "b:second", "c:third"});
+        pigServer.registerQuery("a = load '" + input1 + 
+                "' using PigStorage(':') as (name:chararray, marks:int);");
+        pigServer.registerQuery("b = load '" + input2 + 
+                "' using PigStorage(':') as (name:chararray, 
rank:chararray);");
+        pigServer.registerQuery("c = cogroup a by name, b by name;");
+        pigServer.registerQuery("d = foreach c generate group, 
FLATTEN(a.marks) as newmarks;");
+        pigServer.registerQuery("e = cogroup a by marks, d by newmarks;");
+        pigServer.registerQuery("f = foreach e generate group, flatten(a), 
flatten(d);");
+        HashMap<Integer, Object[]> results = new HashMap<Integer, Object[]>();
+        results.put(1, new Object[] { "a", 1, "a", 1 });
+        results.put(2, new Object[] { "b", 2, "b", 2 });
+        results.put(3, new Object[] { "c", 3, "c", 3 });
+        results.put(20, new Object[] { "b", 20, "b", 20 });
+        results.put(30, new Object[] { "c", 30, "c", 30 });
+        pigServer.explain("f", System.out);
+        
+        Iterator<Tuple> it = pigServer.openIterator("f");
+        while(it.hasNext()) {
+            Tuple t = it.next();
+            System.err.println("Tuple:" + t);
+            Integer group = (Integer)t.get(0);
+            Object[] groupValues = results.get(group);
+            for(int i = 0; i < 4; i++) {
+                assertEquals(groupValues[i], t.get(i+1));    
+            }
+        }
+        Util.deleteFile(cluster, input1);
+        Util.deleteFile(cluster, input2);
+    }
+    
+    @Test
+    public void testImplicitSplitInCoGroup2() throws Exception {
+        // this query is similar to the one reported in JIRA - PIG-537
+        LogicalPlanTester planTester = new LogicalPlanTester();
+        planTester.buildPlan("a = load 'file1' using PigStorage(':') as 
(name:chararray, marks:int);");
+        planTester.buildPlan("b = load 'file2' using PigStorage(':') as 
(name:chararray, rank:chararray);");
+        planTester.buildPlan("c = cogroup a by name, b by name;");
+        planTester.buildPlan("d = foreach c generate group, FLATTEN(a.marks) 
as newmarks;");
+        planTester.buildPlan("e = cogroup a by marks, d by newmarks;");
+        LogicalPlan plan = planTester.buildPlan("f = foreach e generate group, 
flatten(a), flatten(d);");
+        
+        // Set the logical plan values correctly in all the operators
+        PlanSetter ps = new PlanSetter(plan);
+        ps.visit();
+        
+        // run through validator
+        CompilationMessageCollector collector = new 
CompilationMessageCollector() ;
+        TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+        typeValidator.validate(plan, collector) ;        
+        printMessageCollector(collector) ;
+        printTypeGraph(plan) ;
+        
+        if (collector.hasError()) {
+            throw new Exception("Error during type checking") ;
+        }
+
+        // this will run ImplicitSplitInserter
+        TestLogicalOptimizer.optimizePlan(plan);
+        
+        // get Schema of leaf and compare:
+        Schema expectedSchema = Util.getSchemaFromString("grp: 
int,A::username: chararray,A::marks: int,AB::group: chararray,AB::newmarks: 
int");
+        assertTrue(Schema.equals(expectedSchema, 
plan.getLeaves().get(0).getSchema(),false, true));
+    }
+}


Reply via email to